diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 0069573c45..c174e672ad 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -386,6 +386,24 @@ pg_proc.oid Final function (zero if none) + + aggmtransfn + regproc + pg_proc.oid + Forward transition function for moving-aggregate mode (zero if none) + + + aggminvtransfn + regproc + pg_proc.oid + Inverse transition function for moving-aggregate mode (zero if none) + + + aggmfinalfn + regproc + pg_proc.oid + Final function for moving-aggregate mode (zero if none) + aggsortop oid @@ -405,6 +423,20 @@ Approximate average size (in bytes) of the transition state data, or zero to use a default estimate + + aggmtranstype + oid + pg_type.oid + Data type of the aggregate function's internal transition (state) + data for moving-aggregate mode (zero if none) + + + aggmtransspace + int4 + + Approximate average size (in bytes) of the transition state data + for moving-aggregate mode, or zero to use a default estimate + agginitval text @@ -416,6 +448,17 @@ value starts out null. + + aggminitval + text + + + The initial value of the transition state for moving-aggregate mode. + This is a text field containing the initial value in its external + string representation. If this field is null, the transition state + value starts out null. + + diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml index e5fc718654..268acf3e84 100644 --- a/doc/src/sgml/ref/create_aggregate.sgml +++ b/doc/src/sgml/ref/create_aggregate.sgml @@ -27,6 +27,12 @@ CREATE AGGREGATE name ( [ state_data_size ] [ , FINALFUNC = ffunc ] [ , INITCOND = initial_condition ] + [ , MSFUNC = msfunc ] + [ , MINVFUNC = minvfunc ] + [ , MSTYPE = mstate_data_type ] + [ , MSSPACE = mstate_data_size ] + [ , MFINALFUNC = mffunc ] + [ , MINITCOND = minitial_condition ] [ , SORTOP = sort_operator ] ) @@ -49,6 +55,12 @@ CREATE AGGREGATE name ( [ , SSPACE = state_data_size ] [ , FINALFUNC = ffunc ] [ , INITCOND = initial_condition ] + [ , MSFUNC = sfunc ] + [ , MINVFUNC = invfunc ] + [ , MSTYPE = state_data_type ] + [ , MSSPACE = state_data_size ] + [ , MFINALFUNC = ffunc ] + [ , MINITCOND = initial_condition ] [ , SORTOP = sort_operator ] ) @@ -84,7 +96,7 @@ CREATE AGGREGATE name ( - An aggregate function is made from one or two ordinary + A simple aggregate function is made from one or two ordinary functions: a state transition function sfunc, @@ -126,7 +138,7 @@ CREATE AGGREGATE name ( values are ignored (the function is not called and the previous state value is retained). If the initial state value is null, then at the first row with all-nonnull input values, the first argument value replaces the state - value, and the transition function is invoked at subsequent rows with + value, and the transition function is invoked at each subsequent row with all-nonnull input values. This is handy for implementing aggregates like max. Note that this behavior is only available when @@ -154,6 +166,18 @@ CREATE AGGREGATE name ( input rows. + + An aggregate can optionally support moving-aggregate mode, + as described in . This requires + specifying the MSFUNC, MINVFUNC, + and MSTYPE parameters, and optionally + the MSPACE, MFINALFUNC, + and MINITCOND parameters. Except for MINVFUNC, + these parameters work like the corresponding simple-aggregate parameters + without M; they define a separate implementation of the + aggregate that includes an inverse transition function. + + The syntax with ORDER BY in the parameter list creates a special type of aggregate called an ordered-set @@ -197,8 +221,8 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; To be able to create an aggregate function, you must have USAGE privilege on the argument types, the state - type, and the return type, as well as EXECUTE privilege - on the transition and final functions. + type(s), and the return type, as well as EXECUTE + privilege on the transition and final functions. @@ -359,6 +383,79 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; + + msfunc + + + The name of the forward state transition function to be called for each + input row in moving-aggregate mode. This is exactly like the regular + transition function, except that its first argument and result are of + type mstate_data_type, which might be different + from state_data_type. + + + + + + minvfunc + + + The name of the inverse state transition function to be used in + moving-aggregate mode. This function has the same argument and + result types as msfunc, but it is used to remove + a value from the current aggregate state, rather than add a value to + it. The inverse transition function must have the same strictness + attribute as the forward state transition function. + + + + + + mstate_data_type + + + The data type for the aggregate's state value, when using + moving-aggregate mode. + + + + + + mstate_data_size + + + The approximate average size (in bytes) of the aggregate's state + value, when using moving-aggregate mode. This works the same as + state_data_size. + + + + + + mffunc + + + The name of the final function called to compute the aggregate's + result after all input rows have been traversed, when using + moving-aggregate mode. This works the same as ffunc, + except that its input type is mstate_data_type. + The aggregate result type determined by mffunc + and mstate_data_type must match that determined by the + aggregate's regular implementation. + + + + + + minitial_condition + + + The initial setting for the state value, when using moving-aggregate + mode. This works the same as initial_condition. + + + + sort_operator @@ -397,6 +494,49 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; Notes + + If an aggregate supports moving-aggregate mode, it will improve + calculation efficiency when the aggregate is used as a window function + for a window with moving frame start (that is, a frame start mode other + than UNBOUNDED PRECEDING). Conceptually, the forward + transition function adds input values to the aggregate's state when + they enter the window frame from the bottom, and the inverse transition + function removes them again when they leave the frame at the top. So, + when values are removed, they are always removed in the same order they + were added. Whenever the inverse transition function is invoked, it will + thus receive the earliest added but not yet removed argument value(s). + The inverse transition function can assume that at least one row will + remain in the current state after it removes the oldest row. (When this + would not be the case, the window function mechanism simply starts a + fresh aggregation, rather than using the inverse transition function.) + + + + The forward transition function for moving-aggregate mode is not + allowed to return NULL as the new state value. If the inverse + transition function returns NULL, this is taken as an indication that + the inverse function cannot reverse the state calculation for this + particular input, and so the aggregate calculation will be redone from + scratch for the current frame starting position. This convention + allows moving-aggregate mode to be used in situations where there are + some infrequent cases that are impractical to reverse out of the + running state value. + + + + If no moving-aggregate implementation is supplied, + the aggregate can still be used with moving frames, + but PostgreSQL will recompute the whole + aggregation whenever the start of the frame moves. + Note that whether or not the aggregate supports moving-aggregate + mode, PostgreSQL can handle a moving frame + end without recalculation; this is done by continuing to add new values + to the aggregate's state. It is assumed that the final function does + not damage the aggregate's state value, so that the aggregation can be + continued even after an aggregate result value has been obtained for + one set of frame boundaries. + + The syntax for ordered-set aggregates allows VARIADIC to be specified for both the last direct parameter and the last @@ -415,6 +555,11 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; ones; any preceding parameters represent additional direct arguments that are not constrained to match the aggregated arguments. + + + Currently, ordered-set aggregates do not need to support + moving-aggregate mode, since they cannot be used as window functions. + diff --git a/doc/src/sgml/xaggr.sgml b/doc/src/sgml/xaggr.sgml index e77ef12e5c..cbbb051911 100644 --- a/doc/src/sgml/xaggr.sgml +++ b/doc/src/sgml/xaggr.sgml @@ -131,6 +131,161 @@ CREATE AGGREGATE avg (float8) + + Aggregate function calls in SQL allow DISTINCT + and ORDER BY options that control which rows are fed + to the aggregate's transition function and in what order. These + options are implemented behind the scenes and are not the concern + of the aggregate's support functions. + + + + For further details see the + + command. + + + + Moving-Aggregate Mode + + + moving-aggregate mode + + + + aggregate function + moving aggregate + + + + Aggregate functions can optionally support moving-aggregate + mode, which allows substantially faster execution of aggregate + functions within windows with moving frame starting points. + (See + and for information about use of + aggregate functions as window functions.) + The basic idea is that in addition to a normal forward + transition function, the aggregate provides an inverse + transition function, which allows rows to be removed from the + aggregate's running state value when they exit the window frame. + For example a sum aggregate, which uses addition as the + forward transition function, would use subtraction as the inverse + transition function. Without an inverse transition function, the window + function mechanism must recalculate the aggregate from scratch each time + the frame starting point moves, resulting in run time proportional to the + number of input rows times the average frame length. With an inverse + transition function, the run time is only proportional to the number of + input rows. + + + + The inverse transition function is passed the current state value and the + aggregate input value(s) for the earliest row included in the current + state. It must reconstruct what the state value would have been if the + given input value had never been aggregated, but only the rows following + it. This sometimes requires that the forward transition function keep + more state than is needed for plain aggregation mode. Therefore, the + moving-aggregate mode uses a completely separate implementation from the + plain mode: it has its own state data type, its own forward transition + function, and its own final function if needed. These can be the same as + the plain mode's data type and functions, if there is no need for extra + state. + + + + As an example, we could extend the sum aggregate given above + to support moving-aggregate mode like this: + + +CREATE AGGREGATE sum (complex) +( + sfunc = complex_add, + stype = complex, + initcond = '(0,0)', + msfunc = complex_add, + minvfunc = complex_sub, + mstype = complex, + minitcond = '(0,0)' +); + + + The parameters whose names begin with m define the + moving-aggregate implementation. Except for the inverse transition + function minvfunc, they correspond to the plain-aggregate + parameters without m. + + + + The forward transition function for moving-aggregate mode is not allowed + to return NULL as the new state value. If the inverse transition + function returns NULL, this is taken as an indication that the inverse + function cannot reverse the state calculation for this particular input, + and so the aggregate calculation will be redone from scratch for the + current frame starting position. This convention allows moving-aggregate + mode to be used in situations where there are some infrequent cases that + are impractical to reverse out of the running state value. The inverse + transition function can punt on these cases, and yet still come + out ahead so long as it can work for most cases. As an example, an + aggregate working with floating-point numbers might choose to punt when + a NaN (not a number) input has to be removed from the running + state value. + + + + When writing moving-aggregate support functions, it is important to be + sure that the inverse transition function can reconstruct the correct + state value exactly. Otherwise there might be user-visible differences + in results depending on whether the moving-aggregate mode is used. + An example of an aggregate for which adding an inverse transition + function seems easy at first, yet where this requirement cannot be met + is sum over float4 or float8 inputs. A + naive declaration of sum(float8) could be + + +CREATE AGGREGATE unsafe_sum (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); + + + This aggregate, however, can give wildly different results than it would + have without the inverse transition function. For example, consider + + +SELECT + unsafe_sum(x) OVER (ORDER BY n ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) +FROM (VALUES (1, 1.0e20::float8), + (2, 1.0::float8)) AS v (n,x); + + + This query returns 0 as its second result, rather than the + expected answer of 1. The cause is the limited precision of + floating-point values: adding 1 to 1e20 results + in 1e20 again, and so subtracting 1e20 from that + yields 0, not 1. Note that this is a limitation + of floating-point arithmetic in general, not a limitation + of PostgreSQL. + + + + + + Polymorphic and Variadic Aggregates + + + aggregate function + polymorphic + + + + aggregate function + variadic + + Aggregate functions can use polymorphic state transition functions or final functions, so that the same functions @@ -189,8 +344,8 @@ SELECT attrelid::regclass, array_accum(atttypid::regtype) by declaring its last argument as a VARIADIC array, in much the same fashion as for regular functions; see . The aggregate's transition - function must have the same array type as its last argument. The - transition function typically would also be marked VARIADIC, + function(s) must have the same array type as their last argument. The + transition function(s) typically would also be marked VARIADIC, but this is not strictly required. @@ -220,13 +375,15 @@ SELECT myaggregate(a, b, c ORDER BY a) FROM ... - - Aggregate function calls in SQL allow DISTINCT - and ORDER BY options that control which rows are fed - to the aggregate's transition function and in what order. These - options are implemented behind the scenes and are not the concern - of the aggregate's support functions. - + + + + Ordered-Set Aggregates + + + aggregate function + ordered set + The aggregates we have been describing so far are normal @@ -311,6 +468,21 @@ SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY income) FROM households; returns anyelement. + + Currently, ordered-set aggregates cannot be used as window functions, + and therefore there is no need for them to support moving-aggregate mode. + + + + + + Support Functions for Aggregates + + + aggregate function + support functions for + + A function written in C can detect that it is being called as an aggregate transition or final function by calling @@ -341,9 +513,6 @@ if (AggCheckCallContext(fcinfo, NULL)) source code. - - For further details see the - - command. - + + diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index fe6dc8a9a2..633b8f1d6a 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -57,10 +57,16 @@ AggregateCreate(const char *aggName, Oid variadicArgType, List *aggtransfnName, List *aggfinalfnName, + List *aggmtransfnName, + List *aggminvtransfnName, + List *aggmfinalfnName, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, - const char *agginitval) + Oid aggmTransType, + int32 aggmTransSpace, + const char *agginitval, + const char *aggminitval) { Relation aggdesc; HeapTuple tup; @@ -69,14 +75,19 @@ AggregateCreate(const char *aggName, Form_pg_proc proc; Oid transfn; Oid finalfn = InvalidOid; /* can be omitted */ + Oid mtransfn = InvalidOid; /* can be omitted */ + Oid minvtransfn = InvalidOid; /* can be omitted */ + Oid mfinalfn = InvalidOid; /* can be omitted */ Oid sortop = InvalidOid; /* can be omitted */ Oid *aggArgTypes = parameterTypes->values; bool hasPolyArg; bool hasInternalArg; + bool mtransIsStrict = false; Oid rettype; Oid finaltype; Oid fnArgs[FUNC_MAX_ARGS]; int nargs_transfn; + int nargs_finalfn; Oid procOid; TupleDesc tupDesc; int i; @@ -128,6 +139,16 @@ AggregateCreate(const char *aggName, errmsg("cannot determine transition data type"), errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); + /* + * Likewise for moving-aggregate transtype, if any + */ + if (OidIsValid(aggmTransType) && + IsPolymorphicType(aggmTransType) && !hasPolyArg) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("cannot determine transition data type"), + errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); + /* * An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In * principle we could support regular variadic types, but it would make @@ -234,32 +255,120 @@ AggregateCreate(const char *aggName, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); } + ReleaseSysCache(tup); + /* handle moving-aggregate transfn, if supplied */ + if (aggmtransfnName) + { + /* + * The arguments are the same as for the regular transfn, except that + * the transition data type might be different. So re-use the fnArgs + * values set up above, except for that one. + */ + Assert(OidIsValid(aggmTransType)); + fnArgs[0] = aggmTransType; + + mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of transition function %s is not %s", + NameListToString(aggmtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", mtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); + + /* + * If the mtransfn is strict and the minitval is NULL, check first + * input type and mtranstype are binary-compatible. + */ + if (proc->proisstrict && aggminitval == NULL) + { + if (numArgs < 1 || + !IsBinaryCoercible(aggArgTypes[0], aggmTransType)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); + } + + /* Remember if mtransfn is strict; we may need this below */ + mtransIsStrict = proc->proisstrict; + + ReleaseSysCache(tup); + } + + /* handle minvtransfn, if supplied */ + if (aggminvtransfnName) + { + /* + * This must have the same number of arguments with the same types as + * the forward transition function, so just re-use the fnArgs data. + */ + Assert(aggmtransfnName); + + minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of inverse transition function %s is not %s", + NameListToString(aggminvtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", minvtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); + + /* + * We require the strictness settings of the forward and inverse + * transition functions to agree. This saves having to handle + * assorted special cases at execution time. + */ + if (proc->proisstrict != mtransIsStrict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + ReleaseSysCache(tup); + } + + /* + * Set up fnArgs for looking up finalfn(s) + * + * For ordinary aggs, the finalfn just takes the transtype. For + * ordered-set aggs, it takes the transtype plus all args. (The + * aggregated args are useless at runtime, and are actually passed as + * NULLs, but we may need them in the function signature to allow + * resolution of a polymorphic agg's result type.) + */ + fnArgs[0] = aggTransType; + if (AGGKIND_IS_ORDERED_SET(aggKind)) + { + nargs_finalfn = numArgs + 1; + memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); + } + else + { + nargs_finalfn = 1; + /* variadic-ness of the aggregate doesn't affect finalfn */ + variadicArgType = InvalidOid; + } + /* handle finalfn, if supplied */ if (aggfinalfnName) { - int nargs_finalfn; - - /* - * For ordinary aggs, the finalfn just takes the transtype. For - * ordered-set aggs, it takes the transtype plus all args. (The - * aggregated args are useless at runtime, and are actually passed as - * NULLs, but we may need them in the function signature to allow - * resolution of a polymorphic agg's result type.) - */ - fnArgs[0] = aggTransType; - if (AGGKIND_IS_ORDERED_SET(aggKind)) - { - nargs_finalfn = numArgs + 1; - memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); - } - else - { - nargs_finalfn = 1; - /* variadic-ness of the aggregate doesn't affect finalfn */ - variadicArgType = InvalidOid; - } finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn, fnArgs, variadicArgType, &finaltype); @@ -314,6 +423,49 @@ AggregateCreate(const char *aggName, errmsg("unsafe use of pseudo-type \"internal\""), errdetail("A function returning \"internal\" must have at least one \"internal\" argument."))); + /* + * If a moving-aggregate implementation is supplied, look up its finalfn + * if any, and check that the implied aggregate result type matches the + * plain implementation. + */ + if (OidIsValid(aggmTransType)) + { + /* handle finalfn, if supplied */ + if (aggmfinalfnName) + { + /* + * The arguments are the same as for the regular finalfn, except + * that the transition data type might be different. So re-use + * the fnArgs values set up above, except for that one. + */ + fnArgs[0] = aggmTransType; + + mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, check strictness if it's an ordered-set agg */ + if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("final function of an ordered-set aggregate must not be declared STRICT"))); + } + else + { + /* + * If no finalfn, aggregate result type is type of the state value + */ + rettype = aggmTransType; + } + Assert(OidIsValid(rettype)); + if (rettype != finaltype) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s", + format_type_be(aggmTransType), + format_type_be(aggTransType)))); + } + /* handle sortop, if supplied */ if (aggsortopName) { @@ -340,6 +492,13 @@ AggregateCreate(const char *aggName, if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, aggTransType); + if (OidIsValid(aggmTransType)) + { + aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error_type(aclresult, aggmTransType); + } + aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE); if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, finaltype); @@ -392,13 +551,22 @@ AggregateCreate(const char *aggName, values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs); values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn); values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn); + values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn); + values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn); + values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn); values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop); values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType); values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace); + values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType); + values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace); if (agginitval) values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval); else nulls[Anum_pg_aggregate_agginitval - 1] = true; + if (aggminitval) + values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval); + else + nulls[Anum_pg_aggregate_aggminitval - 1] = true; aggdesc = heap_open(AggregateRelationId, RowExclusiveLock); tupDesc = aggdesc->rd_att; @@ -414,6 +582,7 @@ AggregateCreate(const char *aggName, * Create dependencies for the aggregate (above and beyond those already * made by ProcedureCreate). Note: we don't need an explicit dependency * on aggTransType since we depend on it indirectly through transfn. + * Likewise for aggmTransType if any. */ myself.classId = ProcedureRelationId; myself.objectId = procOid; @@ -434,6 +603,33 @@ AggregateCreate(const char *aggName, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + /* Depends on forward transition function, if any */ + if (OidIsValid(mtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on inverse transition function, if any */ + if (OidIsValid(minvtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = minvtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on final function, if any */ + if (OidIsValid(mfinalfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mfinalfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* Depends on sort operator, if any */ if (OidIsValid(sortop)) { @@ -447,7 +643,12 @@ AggregateCreate(const char *aggName, } /* - * lookup_agg_function -- common code for finding both transfn and finalfn + * lookup_agg_function + * common code for finding transfn, invtransfn and finalfn + * + * Returns OID of function, and stores its return type into *rettype + * + * NB: must not scribble on input_types[], as we may re-use those */ static Oid lookup_agg_function(List *fnName, diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 640e19cf12..9714112f6d 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, char aggKind = AGGKIND_NORMAL; List *transfuncName = NIL; List *finalfuncName = NIL; + List *mtransfuncName = NIL; + List *minvtransfuncName = NIL; + List *mfinalfuncName = NIL; List *sortoperatorName = NIL; TypeName *baseType = NULL; TypeName *transType = NULL; + TypeName *mtransType = NULL; int32 transSpace = 0; + int32 mtransSpace = 0; char *initval = NULL; + char *minitval = NULL; int numArgs; int numDirectArgs = 0; oidvector *parameterTypes; @@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, List *parameterDefaults; Oid variadicArgType; Oid transTypeId; + Oid mtransTypeId = InvalidOid; char transTypeType; + char mtransTypeType = 0; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "finalfunc") == 0) finalfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "msfunc") == 0) + mtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "minvfunc") == 0) + minvtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0) + mfinalfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "sortop") == 0) sortoperatorName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "basetype") == 0) @@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transType = defGetTypeName(defel); else if (pg_strcasecmp(defel->defname, "sspace") == 0) transSpace = defGetInt32(defel); + else if (pg_strcasecmp(defel->defname, "mstype") == 0) + mtransType = defGetTypeName(defel); + else if (pg_strcasecmp(defel->defname, "msspace") == 0) + mtransSpace = defGetInt32(defel); else if (pg_strcasecmp(defel->defname, "initcond") == 0) initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "initcond1") == 0) initval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "minitcond") == 0) + minitval = defGetString(defel); else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -158,6 +178,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("aggregate sfunc must be specified"))); + /* + * if mtransType is given, mtransfuncName and minvtransfuncName must be as + * well; if not, then none of the moving-aggregate options should have + * been given. + */ + if (mtransType != NULL) + { + if (mtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must be specified when mstype is specified"))); + if (minvtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must be specified when mstype is specified"))); + } + else + { + if (mtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must not be specified without mstype"))); + if (minvtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must not be specified without mstype"))); + if (mfinalfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate mfinalfunc must not be specified without mstype"))); + if (mtransSpace != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msspace must not be specified without mstype"))); + if (minitval != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minitcond must not be specified without mstype"))); + } + /* * look up the aggregate's input datatype(s). */ @@ -250,6 +310,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, format_type_be(transTypeId)))); } + /* + * If a moving-aggregate transtype is specified, look that up. Same + * restrictions as for transtype. + */ + if (mtransType) + { + mtransTypeId = typenameTypeId(NULL, mtransType); + mtransTypeType = get_typtype(mtransTypeId); + if (mtransTypeType == TYPTYPE_PSEUDO && + !IsPolymorphicType(mtransTypeId)) + { + if (mtransTypeId == INTERNALOID && superuser()) + /* okay */ ; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate transition data type cannot be %s", + format_type_be(mtransTypeId)))); + } + } + /* * If we have an initval, and it's not for a pseudotype (particularly a * polymorphic type), make sure it's acceptable to the type's input @@ -268,6 +349,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (void) OidInputFunctionCall(typinput, initval, typioparam, -1); } + /* + * Likewise for moving-aggregate initval. + */ + if (minitval && mtransTypeType != TYPTYPE_PSEUDO) + { + Oid typinput, + typioparam; + + getTypeInputInfo(mtransTypeId, &typinput, &typioparam); + (void) OidInputFunctionCall(typinput, minitval, typioparam, -1); + } + /* * Most of the argument-checking is done inside of AggregateCreate */ @@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, variadicArgType, transfuncName, /* step function name */ finalfuncName, /* final function name */ + mtransfuncName, /* fwd trans function name */ + minvtransfuncName, /* inv trans function name */ + mfinalfuncName, /* final function name */ sortoperatorName, /* sort operator name */ transTypeId, /* transition data type */ transSpace, /* transition space */ - initval); /* initial condition */ + mtransTypeId, /* transition data type */ + mtransSpace, /* transition space */ + initval, /* initial condition */ + minitval); /* initial condition */ } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 7e4bca5b4d..d60845bcd3 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -1798,8 +1798,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggref->aggtype, aggref->inputcollid, transfn_oid, + InvalidOid, /* invtrans is not needed here */ finalfn_oid, &transfnexpr, + NULL, &finalfnexpr); /* set up infrastructure for calling the transfn and finalfn */ @@ -1847,7 +1849,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * type and transtype are the same (or at least binary-compatible), so * that it's OK to use the first aggregated input value as the initial * transValue. This should have been checked at agg definition time, - * but just in case... + * but we must check again in case the transfn's strictness property + * has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -2126,6 +2129,12 @@ ExecReScanAgg(AggState *node) ExecReScan(node->ss.ps.lefttree); } + +/*********************************************************************** + * API exposed to aggregate functions + ***********************************************************************/ + + /* * AggCheckCallContext - test if a SQL function is being called as an aggregate * @@ -2152,7 +2161,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) { if (aggcontext) - *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext; + *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext; return AGG_CONTEXT_WINDOW; } diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b558e5923..046637fb09 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData */ typedef struct WindowStatePerAggData { - /* Oids of transfer functions */ + /* Oids of transition functions */ Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ Oid finalfn_oid; /* may be InvalidOid */ /* - * fmgr lookup data for transfer functions --- only valid when + * fmgr lookup data for transition functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that fn_strict * flags are kept here. */ FmgrInfo transfn; + FmgrInfo invtransfn; FmgrInfo finalfn; /* @@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData int wfuncno; /* index of associated PerFuncData */ + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + /* Current transition value */ Datum transValue; /* current transition value */ bool transValueIsNull; - bool noTransValue; /* true if transValue not set yet */ + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate, static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, @@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + if (peraggstate->initValueIsNull) peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } peraggstate->transValueIsNull = peraggstate->initValueIsNull; - peraggstate->noTransValue = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; peraggstate->resultValueIsNull = true; } @@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate, { /* * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. + * just keep the prior transValue. Note transValueCount doesn't + * change either. */ for (i = 1; i <= numArguments; i++) { @@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate, return; } } - if (peraggstate->noTransValue) + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) { - /* - * transValue has not been initialized. This is the first non-NULL - * input value. We use it as the initial value for transValue. (We - * already checked that the agg's input type is binary-compatible - * with its transtype, so straight copy here is OK.) - * - * We must copy the datum into aggcontext if it is pass-by-ref. We - * do not need to pfree the old transValue, since it's NULL. - */ - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + peraggstate->transValueCount = 1; MemoryContextSwitchTo(oldContext); return; } + if (peraggstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. */ MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); return; } } /* - * OK to call the transition function + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numArguments + 1, @@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate, (void *) winstate, NULL); fcinfo->arg[0] = peraggstate->transValue; fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return NULL, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return NULL"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() musn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; /* * If pass-by-ref datatype, must copy the new value into aggcontext and @@ -322,7 +368,7 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -336,6 +382,162 @@ advance_windowaggregate(WindowAggState *winstate, peraggstate->transValueIsNull = fcinfo->isnull; } +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + FunctionCallInfoData fcinfodata; + FunctionCallInfo fcinfo = &fcinfodata; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, + &fcinfo->argnull[i], NULL); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->argnull[i]) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->arg[0] = peraggstate->transValue; + fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if invtransfn returned a pointer to + * its first input, we don't need to do anything. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + pfree(DatumGetPointer(peraggstate->transValue)); + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; + + return true; +} + /* * finalize_windowaggregate * parallel to finalize_aggregate in nodeAgg.c @@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate, } else { + winstate->curaggcontext = peraggstate->aggcontext; *result = FunctionCallInvoke(&fcinfo); + winstate->curaggcontext = NULL; *isnull = fcinfo.isnull; } } @@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate, * eval_windowaggregates * evaluate plain aggregates being used as window functions * - * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be * able to call aggregate final functions repeatedly after aggregating more * data onto the same transition value. This is not a behavior required by * nodeAgg.c. @@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate) { WindowStatePerAgg peraggstate; int wfuncno, - numaggs; - int i; + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; MemoryContext oldContext; ExprContext *econtext; WindowObject agg_winobj; TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; numaggs = winstate->numaggs; if (numaggs == 0) @@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate) econtext = winstate->ss.ps.ps_ExprContext; agg_winobj = winstate->agg_winobj; agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; /* * Currently, we support only a subset of the SQL-standard window framing @@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption in * nodeAgg.c too (when it rescans an existing hash table). * - * For other frame start rules, we discard the aggregate state and re-run - * the aggregates whenever the frame head row moves. We can still - * optimize as above whenever successive rows share the same frame head. + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE @@ -452,63 +670,31 @@ eval_windowaggregates(WindowAggState *winstate) * 'aggregatedupto' keeps track of the first row that has not yet been * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. - * - * TODO: Rerunning aggregates from the frame start can be pretty slow. For - * some aggregates like SUM and COUNT we could avoid that by implementing - * a "negative transition function" that would be called for each row as - * it exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. */ /* * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. */ - update_frameheadpos(agg_winobj, winstate->temp_slot_1); + update_frameheadpos(agg_winobj, temp_slot); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); /* - * Initialize aggregates on first call for partition, or if the frame head - * position moved since last time. + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, and the current row lies within the previous + * row's frame, then the two frames' ends must coincide. Note that on the + * first row aggregatedbase == aggregatedupto, meaning this test must + * fail, so we don't need to check the "there was no previous row" case + * explicitly here. */ - if (winstate->currentpos == 0 || - winstate->frameheadpos != winstate->aggregatedbase) - { - /* - * Discard transient aggregate values - */ - MemoryContextResetAndDeleteChildren(winstate->aggcontext); - - for (i = 0; i < numaggs; i++) - { - peraggstate = &winstate->peragg[i]; - wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); - } - - /* - * If we created a mark pointer for aggregates, keep it pushed up to - * frame head, so that tuplestore can discard unnecessary rows. - */ - if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); - - /* - * Initialize for loop below - */ - ExecClearTuple(agg_row_slot); - winstate->aggregatedbase = winstate->frameheadpos; - winstate->aggregatedupto = winstate->frameheadpos; - } - - /* - * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates - * except when the frame head moves. In END_CURRENT_ROW mode, we only - * have to recalculate when the frame head moves or currentpos has - * advanced past the place we'd aggregated up to. Check for these cases - * and if so, reuse the saved result values. - */ - if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | FRAMEOPTION_END_CURRENT_ROW)) && winstate->aggregatedbase <= winstate->currentpos && winstate->aggregatedupto > winstate->currentpos) @@ -523,6 +709,158 @@ eval_windowaggregates(WindowAggState *winstate) return; } + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { + /* + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. + */ + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; + + /* + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. + */ + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); + } + + /* + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. + */ + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) + { + wfuncno = peraggstate->wfuncno; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; + } + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); + } + /* * Advance until we reach a row not in frame (or end of partition). * @@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate) for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + wfuncno = peraggstate->wfuncno; advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], @@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate) ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + /* * finalize aggregates and fill result/isnull fields. */ @@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate) * advance that the next row can't possibly share the same frame. Is * it worth detecting that and skipping this code? */ - if (!peraggstate->resulttypeByVal) + if (!peraggstate->resulttypeByVal && !*isnull) { - /* - * clear old resultValue in order not to leak memory. (Note: the - * new result can't possibly be the same datum as old resultValue, - * because we never passed it to the trans function.) - */ - if (!peraggstate->resultValueIsNull) - pfree(DatumGetPointer(peraggstate->resultValue)); - - /* - * If pass-by-ref, copy it into our aggregate context. - */ - if (!*isnull) - { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); - peraggstate->resultValue = - datumCopy(*result, - peraggstate->resulttypeByVal, - peraggstate->resulttypeLen); - MemoryContextSwitchTo(oldContext); - } + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); } else { @@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, (void *) perfuncstate->winobj, NULL); /* Just in case, make all the regular argument slots be null */ memset(fcinfo.argnull, true, perfuncstate->numArguments); + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; *result = FunctionCallInvoke(&fcinfo); *isnull = fcinfo.isnull; @@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate) */ MemoryContextResetAndDeleteChildren(winstate->partcontext); MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* Create mid-lived context for aggregate trans values etc */ + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ winstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "WindowAgg_Aggregates", @@ -1657,12 +2002,10 @@ void ExecEndWindowAgg(WindowAggState *node) { PlanState *outerPlan; + int i; release_partition(node); - pfree(node->perfunc); - pfree(node->peragg); - ExecClearTuple(node->ss.ss_ScanTupleSlot); ExecClearTuple(node->first_part_slot); ExecClearTuple(node->agg_row_slot); @@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } MemoryContextDelete(node->partcontext); MemoryContextDelete(node->aggcontext); + pfree(node->perfunc); + pfree(node->peragg); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; + AttrNumber initvalAttNo; AclResult aclresult; Oid transfn_oid, + invtransfn_oid, finalfn_oid; Expr *transfnexpr, + *invtransfnexpr, *finalfnexpr; Datum textInitVal; int i; @@ -1756,14 +2110,40 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * If the frame head can't move, we don't need moving-aggregate code. Even + * if we'd like to use it, don't do so if the aggregate's arguments (and + * FILTER clause if any) contain any calls to volatile functions. + * Otherwise, the difference between restarting and not restarting the + * aggregation would be user-visible. + */ + if (OidIsValid(aggform->aggminvtransfn) && + !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && + !contain_volatile_functions((Node *) wfunc)) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + /* * ExecInitWindowAgg already checked permission to call aggregate function * ... but we still need to check the component functions */ - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; @@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, /* resolve actual type of transition state, if polymorphic */ aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, - aggform->aggtranstype, + aggtranstype, inputTypes, numArguments); @@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->wintype, wfunc->inputcollid, transfn_oid, + invtransfn_oid, finalfn_oid, &transfnexpr, + &invtransfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); @@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) @@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * If the transfn is strict and the initval is NULL, make sure input type * and transtype are the same (or at least binary-compatible), so that * it's OK to use the first input value as the initial transValue. This - * should have been checked at agg definition time, but just in case... + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid))); } + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_AggregatePrivate", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + else + peraggstate->aggcontext = winstate->aggcontext; + ReleaseSysCache(aggTuple); return peraggstate; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 201529b885..3f307e6464 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -471,7 +471,11 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context) Assert(aggref->agglevelsup == 0); - /* fetch info about aggregate from pg_aggregate */ + /* + * Fetch info about aggregate from pg_aggregate. Note it's correct to + * ignore the moving-aggregate variant, since what we're concerned + * with here is aggregates not window functions. + */ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 9613e2aec8..272d27f919 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -1187,11 +1187,13 @@ resolve_aggregate_transtype(Oid aggfuncid, * For an ordered-set aggregate, remember that agg_input_types describes * the direct arguments followed by the aggregated arguments. * - * transfn_oid and finalfn_oid identify the funcs to be called; the latter - * may be InvalidOid. + * transfn_oid, invtransfn_oid and finalfn_oid identify the funcs to be + * called; the latter two may be InvalidOid. * - * Pointers to the constructed trees are returned into *transfnexpr and - * *finalfnexpr. The latter is set to NULL if there's no finalfn. + * Pointers to the constructed trees are returned into *transfnexpr, + * *invtransfnexpr and *finalfnexpr. If there is no invtransfn or finalfn, + * the respective pointers are set to NULL. Since use of the invtransfn is + * optional, NULL may be passed for invtransfnexpr. */ void build_aggregate_fnexprs(Oid *agg_input_types, @@ -1203,8 +1205,10 @@ build_aggregate_fnexprs(Oid *agg_input_types, Oid agg_result_type, Oid agg_input_collation, Oid transfn_oid, + Oid invtransfn_oid, Oid finalfn_oid, Expr **transfnexpr, + Expr **invtransfnexpr, Expr **finalfnexpr) { Param *argp; @@ -1249,6 +1253,26 @@ build_aggregate_fnexprs(Oid *agg_input_types, fexpr->funcvariadic = agg_variadic; *transfnexpr = (Expr *) fexpr; + /* + * Build invtransfn expression if requested, with same args as transfn + */ + if (invtransfnexpr != NULL) + { + if (OidIsValid(invtransfn_oid)) + { + fexpr = makeFuncExpr(invtransfn_oid, + agg_state_type, + args, + InvalidOid, + agg_input_collation, + COERCE_EXPLICIT_CALL); + fexpr->funcvariadic = agg_variadic; + *invtransfnexpr = (Expr *) fexpr; + } + else + *invtransfnexpr = NULL; + } + /* see if we have a final function */ if (!OidIsValid(finalfn_oid)) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2653ef052b..a6c0428501 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -11548,20 +11548,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) PGresult *res; int i_aggtransfn; int i_aggfinalfn; + int i_aggmtransfn; + int i_aggminvtransfn; + int i_aggmfinalfn; int i_aggsortop; int i_hypothetical; int i_aggtranstype; int i_aggtransspace; + int i_aggmtranstype; + int i_aggmtransspace; int i_agginitval; + int i_aggminitval; int i_convertok; const char *aggtransfn; const char *aggfinalfn; + const char *aggmtransfn; + const char *aggminvtransfn; + const char *aggmfinalfn; const char *aggsortop; char *aggsortconvop; bool hypothetical; const char *aggtranstype; const char *aggtransspace; + const char *aggmtranstype; + const char *aggmtransspace; const char *agginitval; + const char *aggminitval; bool convertok; /* Skip if not to be dumped */ @@ -11582,9 +11594,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "aggmtransfn, aggminvtransfn, aggmfinalfn, " + "aggmtranstype::pg_catalog.regtype, " "aggsortop::pg_catalog.regoperator, " "(aggkind = 'h') as hypothetical, " "aggtransspace, agginitval, " + "aggmtransspace, aggminitval, " "'t'::boolean AS convertok, " "pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, " "pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs " @@ -11597,9 +11612,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "aggsortop::pg_catalog.regoperator, " "false as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok, " "pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, " "pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs " @@ -11612,9 +11630,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "aggsortop::pg_catalog.regoperator, " "false as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p " "WHERE a.aggfnoid = p.oid " @@ -11625,9 +11646,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p " "WHERE a.aggfnoid = p.oid " @@ -11638,9 +11662,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, aggfinalfn, " "format_type(aggtranstype, NULL) AS aggtranstype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_aggregate " "WHERE oid = '%u'::oid", @@ -11651,9 +11678,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) appendPQExpBuffer(query, "SELECT aggtransfn1 AS aggtransfn, " "aggfinalfn, " "(SELECT typname FROM pg_type WHERE oid = aggtranstype1) AS aggtranstype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval1 AS agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "(aggtransfn2 = 0 and aggtranstype2 = 0 and agginitval2 is null) AS convertok " "FROM pg_aggregate " "WHERE oid = '%u'::oid", @@ -11664,20 +11694,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) i_aggtransfn = PQfnumber(res, "aggtransfn"); i_aggfinalfn = PQfnumber(res, "aggfinalfn"); + i_aggmtransfn = PQfnumber(res, "aggmtransfn"); + i_aggminvtransfn = PQfnumber(res, "aggminvtransfn"); + i_aggmfinalfn = PQfnumber(res, "aggmfinalfn"); i_aggsortop = PQfnumber(res, "aggsortop"); i_hypothetical = PQfnumber(res, "hypothetical"); i_aggtranstype = PQfnumber(res, "aggtranstype"); i_aggtransspace = PQfnumber(res, "aggtransspace"); + i_aggmtranstype = PQfnumber(res, "aggmtranstype"); + i_aggmtransspace = PQfnumber(res, "aggmtransspace"); i_agginitval = PQfnumber(res, "agginitval"); + i_aggminitval = PQfnumber(res, "aggminitval"); i_convertok = PQfnumber(res, "convertok"); aggtransfn = PQgetvalue(res, 0, i_aggtransfn); aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn); + aggmtransfn = PQgetvalue(res, 0, i_aggmtransfn); + aggminvtransfn = PQgetvalue(res, 0, i_aggminvtransfn); + aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn); aggsortop = PQgetvalue(res, 0, i_aggsortop); hypothetical = (PQgetvalue(res, 0, i_hypothetical)[0] == 't'); aggtranstype = PQgetvalue(res, 0, i_aggtranstype); aggtransspace = PQgetvalue(res, 0, i_aggtransspace); + aggmtranstype = PQgetvalue(res, 0, i_aggmtranstype); + aggmtransspace = PQgetvalue(res, 0, i_aggmtransspace); agginitval = PQgetvalue(res, 0, i_agginitval); + aggminitval = PQgetvalue(res, 0, i_aggminitval); convertok = (PQgetvalue(res, 0, i_convertok)[0] == 't'); if (fout->remoteVersion >= 80400) @@ -11751,6 +11793,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) aggfinalfn); } + if (strcmp(aggmtransfn, "-") != 0) + { + appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s", + aggmtransfn, + aggminvtransfn, + aggmtranstype); + } + + if (strcmp(aggmtransspace, "0") != 0) + { + appendPQExpBuffer(details, ",\n MSSPACE = %s", + aggmtransspace); + } + + if (!PQgetisnull(res, 0, i_aggminitval)) + { + appendPQExpBufferStr(details, ",\n MINITCOND = "); + appendStringLiteralAH(details, aggminitval, fout); + } + + if (strcmp(aggmfinalfn, "-") != 0) + { + appendPQExpBuffer(details, ",\n MFINALFUNC = %s", + aggmfinalfn); + } + aggsortconvop = convertOperatorReference(fout, aggsortop); if (aggsortconvop) { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index fe6144e2d3..2fb0ce8656 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201404082 +#define CATALOG_VERSION_NO 201404121 #endif diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index f189998597..3cb0d754e7 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -32,10 +32,16 @@ * aggnumdirectargs number of arguments that are "direct" arguments * aggtransfn transition function * aggfinalfn final function (0 if none) + * aggmtransfn forward function for moving-aggregate mode (0 if none) + * aggminvtransfn inverse function for moving-aggregate mode (0 if none) + * aggmfinalfn final function for moving-aggregate mode (0 if none) * aggsortop associated sort operator (0 if none) * aggtranstype type of aggregate's transition (state) data * aggtransspace estimated size of state data (0 for default estimate) + * aggmtranstype type of moving-aggregate state data (0 if none) + * aggmtransspace estimated size of moving-agg state (0 for default est) * agginitval initial value for transition state (can be NULL) + * aggminitval initial value for moving-agg state (can be NULL) * ---------------------------------------------------------------- */ #define AggregateRelationId 2600 @@ -47,12 +53,18 @@ CATALOG(pg_aggregate,2600) BKI_WITHOUT_OIDS int16 aggnumdirectargs; regproc aggtransfn; regproc aggfinalfn; + regproc aggmtransfn; + regproc aggminvtransfn; + regproc aggmfinalfn; Oid aggsortop; Oid aggtranstype; int32 aggtransspace; + Oid aggmtranstype; + int32 aggmtransspace; #ifdef CATALOG_VARLEN /* variable-length fields start here */ text agginitval; + text aggminitval; #endif } FormData_pg_aggregate; @@ -68,16 +80,22 @@ typedef FormData_pg_aggregate *Form_pg_aggregate; * ---------------- */ -#define Natts_pg_aggregate 9 +#define Natts_pg_aggregate 15 #define Anum_pg_aggregate_aggfnoid 1 #define Anum_pg_aggregate_aggkind 2 #define Anum_pg_aggregate_aggnumdirectargs 3 #define Anum_pg_aggregate_aggtransfn 4 #define Anum_pg_aggregate_aggfinalfn 5 -#define Anum_pg_aggregate_aggsortop 6 -#define Anum_pg_aggregate_aggtranstype 7 -#define Anum_pg_aggregate_aggtransspace 8 -#define Anum_pg_aggregate_agginitval 9 +#define Anum_pg_aggregate_aggmtransfn 6 +#define Anum_pg_aggregate_aggminvtransfn 7 +#define Anum_pg_aggregate_aggmfinalfn 8 +#define Anum_pg_aggregate_aggsortop 9 +#define Anum_pg_aggregate_aggtranstype 10 +#define Anum_pg_aggregate_aggtransspace 11 +#define Anum_pg_aggregate_aggmtranstype 12 +#define Anum_pg_aggregate_aggmtransspace 13 +#define Anum_pg_aggregate_agginitval 14 +#define Anum_pg_aggregate_aggminitval 15 /* * Symbolic values for aggkind column. We distinguish normal aggregates @@ -101,177 +119,177 @@ typedef FormData_pg_aggregate *Form_pg_aggregate; */ /* avg */ -DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg 0 2281 128 _null_ )); -DATA(insert ( 2101 n 0 int4_avg_accum int8_avg 0 1016 0 "{0,0}" )); -DATA(insert ( 2102 n 0 int2_avg_accum int8_avg 0 1016 0 "{0,0}" )); -DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg 0 2281 128 _null_ )); -DATA(insert ( 2104 n 0 float4_accum float8_avg 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2105 n 0 float8_accum float8_avg 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2106 n 0 interval_accum interval_avg 0 1187 0 "{0 second,0 second}" )); +DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2101 n 0 int4_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ )); +DATA(insert ( 2102 n 0 int2_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ )); +DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2104 n 0 float4_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2105 n 0 float8_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2106 n 0 interval_accum interval_avg - - - 0 1187 0 0 0 "{0 second,0 second}" _null_ )); /* sum */ -DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum 0 2281 128 _null_ )); -DATA(insert ( 2108 n 0 int4_sum - 0 20 0 _null_ )); -DATA(insert ( 2109 n 0 int2_sum - 0 20 0 _null_ )); -DATA(insert ( 2110 n 0 float4pl - 0 700 0 _null_ )); -DATA(insert ( 2111 n 0 float8pl - 0 701 0 _null_ )); -DATA(insert ( 2112 n 0 cash_pl - 0 790 0 _null_ )); -DATA(insert ( 2113 n 0 interval_pl - 0 1186 0 _null_ )); -DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum 0 2281 128 _null_ )); +DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2108 n 0 int4_sum - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2109 n 0 int2_sum - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2110 n 0 float4pl - - - - 0 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2111 n 0 float8pl - - - - 0 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2112 n 0 cash_pl - - - - 0 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2113 n 0 interval_pl - - - - 0 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ )); /* max */ -DATA(insert ( 2115 n 0 int8larger - 413 20 0 _null_ )); -DATA(insert ( 2116 n 0 int4larger - 521 23 0 _null_ )); -DATA(insert ( 2117 n 0 int2larger - 520 21 0 _null_ )); -DATA(insert ( 2118 n 0 oidlarger - 610 26 0 _null_ )); -DATA(insert ( 2119 n 0 float4larger - 623 700 0 _null_ )); -DATA(insert ( 2120 n 0 float8larger - 674 701 0 _null_ )); -DATA(insert ( 2121 n 0 int4larger - 563 702 0 _null_ )); -DATA(insert ( 2122 n 0 date_larger - 1097 1082 0 _null_ )); -DATA(insert ( 2123 n 0 time_larger - 1112 1083 0 _null_ )); -DATA(insert ( 2124 n 0 timetz_larger - 1554 1266 0 _null_ )); -DATA(insert ( 2125 n 0 cashlarger - 903 790 0 _null_ )); -DATA(insert ( 2126 n 0 timestamp_larger - 2064 1114 0 _null_ )); -DATA(insert ( 2127 n 0 timestamptz_larger - 1324 1184 0 _null_ )); -DATA(insert ( 2128 n 0 interval_larger - 1334 1186 0 _null_ )); -DATA(insert ( 2129 n 0 text_larger - 666 25 0 _null_ )); -DATA(insert ( 2130 n 0 numeric_larger - 1756 1700 0 _null_ )); -DATA(insert ( 2050 n 0 array_larger - 1073 2277 0 _null_ )); -DATA(insert ( 2244 n 0 bpchar_larger - 1060 1042 0 _null_ )); -DATA(insert ( 2797 n 0 tidlarger - 2800 27 0 _null_ )); -DATA(insert ( 3526 n 0 enum_larger - 3519 3500 0 _null_ )); +DATA(insert ( 2115 n 0 int8larger - - - - 413 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2116 n 0 int4larger - - - - 521 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2117 n 0 int2larger - - - - 520 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2118 n 0 oidlarger - - - - 610 26 0 0 0 _null_ _null_ )); +DATA(insert ( 2119 n 0 float4larger - - - - 623 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2120 n 0 float8larger - - - - 674 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2121 n 0 int4larger - - - - 563 702 0 0 0 _null_ _null_ )); +DATA(insert ( 2122 n 0 date_larger - - - - 1097 1082 0 0 0 _null_ _null_ )); +DATA(insert ( 2123 n 0 time_larger - - - - 1112 1083 0 0 0 _null_ _null_ )); +DATA(insert ( 2124 n 0 timetz_larger - - - - 1554 1266 0 0 0 _null_ _null_ )); +DATA(insert ( 2125 n 0 cashlarger - - - - 903 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2126 n 0 timestamp_larger - - - - 2064 1114 0 0 0 _null_ _null_ )); +DATA(insert ( 2127 n 0 timestamptz_larger - - - - 1324 1184 0 0 0 _null_ _null_ )); +DATA(insert ( 2128 n 0 interval_larger - - - - 1334 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2129 n 0 text_larger - - - - 666 25 0 0 0 _null_ _null_ )); +DATA(insert ( 2130 n 0 numeric_larger - - - - 1756 1700 0 0 0 _null_ _null_ )); +DATA(insert ( 2050 n 0 array_larger - - - - 1073 2277 0 0 0 _null_ _null_ )); +DATA(insert ( 2244 n 0 bpchar_larger - - - - 1060 1042 0 0 0 _null_ _null_ )); +DATA(insert ( 2797 n 0 tidlarger - - - - 2800 27 0 0 0 _null_ _null_ )); +DATA(insert ( 3526 n 0 enum_larger - - - - 3519 3500 0 0 0 _null_ _null_ )); /* min */ -DATA(insert ( 2131 n 0 int8smaller - 412 20 0 _null_ )); -DATA(insert ( 2132 n 0 int4smaller - 97 23 0 _null_ )); -DATA(insert ( 2133 n 0 int2smaller - 95 21 0 _null_ )); -DATA(insert ( 2134 n 0 oidsmaller - 609 26 0 _null_ )); -DATA(insert ( 2135 n 0 float4smaller - 622 700 0 _null_ )); -DATA(insert ( 2136 n 0 float8smaller - 672 701 0 _null_ )); -DATA(insert ( 2137 n 0 int4smaller - 562 702 0 _null_ )); -DATA(insert ( 2138 n 0 date_smaller - 1095 1082 0 _null_ )); -DATA(insert ( 2139 n 0 time_smaller - 1110 1083 0 _null_ )); -DATA(insert ( 2140 n 0 timetz_smaller - 1552 1266 0 _null_ )); -DATA(insert ( 2141 n 0 cashsmaller - 902 790 0 _null_ )); -DATA(insert ( 2142 n 0 timestamp_smaller - 2062 1114 0 _null_ )); -DATA(insert ( 2143 n 0 timestamptz_smaller - 1322 1184 0 _null_ )); -DATA(insert ( 2144 n 0 interval_smaller - 1332 1186 0 _null_ )); -DATA(insert ( 2145 n 0 text_smaller - 664 25 0 _null_ )); -DATA(insert ( 2146 n 0 numeric_smaller - 1754 1700 0 _null_ )); -DATA(insert ( 2051 n 0 array_smaller - 1072 2277 0 _null_ )); -DATA(insert ( 2245 n 0 bpchar_smaller - 1058 1042 0 _null_ )); -DATA(insert ( 2798 n 0 tidsmaller - 2799 27 0 _null_ )); -DATA(insert ( 3527 n 0 enum_smaller - 3518 3500 0 _null_ )); +DATA(insert ( 2131 n 0 int8smaller - - - - 412 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2132 n 0 int4smaller - - - - 97 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2133 n 0 int2smaller - - - - 95 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2134 n 0 oidsmaller - - - - 609 26 0 0 0 _null_ _null_ )); +DATA(insert ( 2135 n 0 float4smaller - - - - 622 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2136 n 0 float8smaller - - - - 672 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2137 n 0 int4smaller - - - - 562 702 0 0 0 _null_ _null_ )); +DATA(insert ( 2138 n 0 date_smaller - - - - 1095 1082 0 0 0 _null_ _null_ )); +DATA(insert ( 2139 n 0 time_smaller - - - - 1110 1083 0 0 0 _null_ _null_ )); +DATA(insert ( 2140 n 0 timetz_smaller - - - - 1552 1266 0 0 0 _null_ _null_ )); +DATA(insert ( 2141 n 0 cashsmaller - - - - 902 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2142 n 0 timestamp_smaller - - - - 2062 1114 0 0 0 _null_ _null_ )); +DATA(insert ( 2143 n 0 timestamptz_smaller - - - - 1322 1184 0 0 0 _null_ _null_ )); +DATA(insert ( 2144 n 0 interval_smaller - - - - 1332 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2145 n 0 text_smaller - - - - 664 25 0 0 0 _null_ _null_ )); +DATA(insert ( 2146 n 0 numeric_smaller - - - - 1754 1700 0 0 0 _null_ _null_ )); +DATA(insert ( 2051 n 0 array_smaller - - - - 1072 2277 0 0 0 _null_ _null_ )); +DATA(insert ( 2245 n 0 bpchar_smaller - - - - 1058 1042 0 0 0 _null_ _null_ )); +DATA(insert ( 2798 n 0 tidsmaller - - - - 2799 27 0 0 0 _null_ _null_ )); +DATA(insert ( 3527 n 0 enum_smaller - - - - 3518 3500 0 0 0 _null_ _null_ )); /* count */ -DATA(insert ( 2147 n 0 int8inc_any - 0 20 0 "0" )); -DATA(insert ( 2803 n 0 int8inc - 0 20 0 "0" )); +DATA(insert ( 2147 n 0 int8inc_any - - - - 0 20 0 0 0 "0" _null_ )); +DATA(insert ( 2803 n 0 int8inc - - - - 0 20 0 0 0 "0" _null_ )); /* var_pop */ -DATA(insert ( 2718 n 0 int8_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2719 n 0 int4_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2720 n 0 int2_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2721 n 0 float4_accum float8_var_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2722 n 0 float8_accum float8_var_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop 0 2281 128 _null_ )); +DATA(insert ( 2718 n 0 int8_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2719 n 0 int4_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2720 n 0 int2_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2721 n 0 float4_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2722 n 0 float8_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); /* var_samp */ -DATA(insert ( 2641 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2642 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2643 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2644 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2645 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ )); +DATA(insert ( 2641 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2642 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2643 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2644 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2645 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* variance: historical Postgres syntax for var_samp */ -DATA(insert ( 2148 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2149 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2150 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2151 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2152 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ )); +DATA(insert ( 2148 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2149 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2150 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2151 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2152 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev_pop */ -DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop 0 2281 128 _null_ )); +DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev_samp */ -DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ )); +DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev: historical Postgres syntax for stddev_samp */ -DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ )); +DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* SQL2003 binary regression aggregates */ -DATA(insert ( 2818 n 0 int8inc_float8_float8 - 0 20 0 "0" )); -DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2829 n 0 float8_regr_accum float8_corr 0 1022 0 "{0,0,0,0,0,0}" )); +DATA(insert ( 2818 n 0 int8inc_float8_float8 - - - - 0 20 0 0 0 "0" _null_ )); +DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2829 n 0 float8_regr_accum float8_corr - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); /* boolean-and and boolean-or */ -DATA(insert ( 2517 n 0 booland_statefunc - 58 16 0 _null_ )); -DATA(insert ( 2518 n 0 boolor_statefunc - 59 16 0 _null_ )); -DATA(insert ( 2519 n 0 booland_statefunc - 58 16 0 _null_ )); +DATA(insert ( 2517 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ )); +DATA(insert ( 2518 n 0 boolor_statefunc - - - - 59 16 0 0 0 _null_ _null_ )); +DATA(insert ( 2519 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ )); /* bitwise integer */ -DATA(insert ( 2236 n 0 int2and - 0 21 0 _null_ )); -DATA(insert ( 2237 n 0 int2or - 0 21 0 _null_ )); -DATA(insert ( 2238 n 0 int4and - 0 23 0 _null_ )); -DATA(insert ( 2239 n 0 int4or - 0 23 0 _null_ )); -DATA(insert ( 2240 n 0 int8and - 0 20 0 _null_ )); -DATA(insert ( 2241 n 0 int8or - 0 20 0 _null_ )); -DATA(insert ( 2242 n 0 bitand - 0 1560 0 _null_ )); -DATA(insert ( 2243 n 0 bitor - 0 1560 0 _null_ )); +DATA(insert ( 2236 n 0 int2and - - - - 0 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2237 n 0 int2or - - - - 0 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2238 n 0 int4and - - - - 0 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2239 n 0 int4or - - - - 0 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2240 n 0 int8and - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2241 n 0 int8or - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2242 n 0 bitand - - - - 0 1560 0 0 0 _null_ _null_ )); +DATA(insert ( 2243 n 0 bitor - - - - 0 1560 0 0 0 _null_ _null_ )); /* xml */ -DATA(insert ( 2901 n 0 xmlconcat2 - 0 142 0 _null_ )); +DATA(insert ( 2901 n 0 xmlconcat2 - - - - 0 142 0 0 0 _null_ _null_ )); /* array */ -DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* text */ -DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* bytea */ -DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* json */ -DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn 0 2281 0 _null_ )); -DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* ordered-set and hypothetical-set aggregates */ -DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final 0 2281 0 _null_ )); -DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final 0 2281 0 _null_ )); -DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final 0 2281 0 _null_ )); -DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3984 o 0 ordered_set_transition mode_final 0 2281 0 _null_ )); -DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final 0 2281 0 _null_ )); -DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final 0 2281 0 _null_ )); -DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final 0 2281 0 _null_ )); -DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final 0 2281 0 _null_ )); +DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final - - - 0 2281 0 0 0 _null_ _null_ )); /* @@ -290,9 +308,15 @@ extern Oid AggregateCreate(const char *aggName, Oid variadicArgType, List *aggtransfnName, List *aggfinalfnName, + List *aggmtransfnName, + List *aggminvtransfnName, + List *aggmfinalfnName, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, - const char *agginitval); + Oid aggmTransType, + int32 aggmTransSpace, + const char *agginitval, + const char *aggminitval); #endif /* PG_AGGREGATE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index a301a08fba..6c94e8a7ae 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1762,7 +1762,8 @@ typedef struct WindowAggState Datum endOffsetValue; /* result of endOffset evaluation */ MemoryContext partcontext; /* context for partition-lifespan data */ - MemoryContext aggcontext; /* context for each aggregate data */ + MemoryContext aggcontext; /* shared context for aggregate working data */ + MemoryContext curaggcontext; /* current aggregate's working data */ ExprContext *tmpcontext; /* short-term evaluation context */ bool all_first; /* true if the scan is starting */ diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h index 8faf991a09..938d408bb7 100644 --- a/src/include/parser/parse_agg.h +++ b/src/include/parser/parse_agg.h @@ -39,8 +39,10 @@ extern void build_aggregate_fnexprs(Oid *agg_input_types, Oid agg_result_type, Oid agg_input_collation, Oid transfn_oid, + Oid invtransfn_oid, Oid finalfn_oid, Expr **transfnexpr, + Expr **invtransfnexpr, Expr **finalfnexpr); #endif /* PARSE_AGG_H */ diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index ca908d91f4..a547ca535f 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -90,3 +90,38 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") public | test_rank | bigint | VARIADIC "any" ORDER BY VARIADIC "any" | (2 rows) +-- moving-aggregate options +CREATE AGGREGATE sumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); +-- invalid: nonstrict inverse with strict forward function +CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS +$$ SELECT $1 - $2; $$ +LANGUAGE SQL; +CREATE AGGREGATE invalidsumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_n +); +ERROR: strictness of aggregate's forward and inverse transition functions must match +-- invalid: non-matching result types +CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS +$$ SELECT CAST($1 - $2 AS INT); $$ +LANGUAGE SQL; +CREATE AGGREGATE wrongreturntype (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_int +); +ERROR: return type of inverse transition function float8mi_int is not double precision diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index 118f7e43dc..93ff18d589 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -735,7 +735,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggkind NOT IN ('n', 'o', 'h') OR aggnumdirectargs < 0 OR (aggkind = 'n' AND aggnumdirectargs > 0) OR - aggtranstype = 0 OR aggtransspace < 0; + aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0; ctid | aggfnoid ------+---------- (0 rows) @@ -827,6 +827,126 @@ WHERE a.aggfnoid = p.oid AND ----------+---------+-----+--------- (0 rows) +-- Check for inconsistent specifications of moving-aggregate columns. +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype != 0 AND + (aggmtransfn = 0 OR aggminvtransfn = 0); + ctid | aggfnoid +------+---------- +(0 rows) + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype = 0 AND + (aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR + aggmtransspace != 0 OR aggminitval IS NOT NULL); + ctid | aggfnoid +------+---------- +(0 rows) + +-- If there is no mfinalfn then the output type must be the mtranstype. +SELECT a.aggfnoid::oid, p.proname +FROM pg_aggregate as a, pg_proc as p +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn != 0 AND + a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype; + aggfnoid | proname +----------+--------- +(0 rows) + +-- Cross-check mtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- Cross-check minvtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggminvtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- Cross-check mfinalfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn +WHERE a.aggfnoid = p.oid AND + a.aggmfinalfn = pfn.oid AND + (pfn.proretset OR + NOT binary_coercible(pfn.prorettype, p.prorettype) OR + NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR + CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1 + ELSE pfn.pronargs != p.pronargs + 1 + OR (p.pronargs > 0 AND + NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + END); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- If mtransfn is strict then either minitval should be non-NULL, or +-- input type should match mtranstype so that the first non-null input +-- can be assigned as the state value. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND ptr.proisstrict AND + a.aggminitval IS NULL AND + NOT binary_coercible(p.proargtypes[0], a.aggmtranstype); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- transfn and mtransfn should have same strictness setting. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr +WHERE a.aggfnoid = p.oid AND + a.aggtransfn = ptr.oid AND + a.aggmtransfn = mptr.oid AND + ptr.proisstrict != mptr.proisstrict; + aggfnoid | proname | oid | proname | oid | proname +----------+---------+-----+---------+-----+--------- +(0 rows) + -- Cross-check aggsortop (if present) against pg_operator. -- We expect to find entries for bool_and, bool_or, every, max, and min. SELECT DISTINCT proname, oprname diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out index 0f21fcb01d..d9cb0addb3 100644 --- a/src/test/regress/expected/window.out +++ b/src/test/regress/expected/window.out @@ -1071,3 +1071,226 @@ SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four 1 | 3 | 3 (10 rows) +-- +-- Test the basic moving-aggregate machinery +-- +-- create aggregates that record the series of transform calls (these are +-- intentionally not true inverses) +CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE AGGREGATE logging_agg_nonstrict (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict +); +CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict, + initcond = 'I', + minitcond = 'MI' +); +CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '*' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '+' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE AGGREGATE logging_agg_strict (text) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict +); +CREATE AGGREGATE logging_agg_strict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict, + initcond = 'I', + minitcond = 'MI' +); +-- test strict and non-strict cases +SELECT + p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row, + logging_agg_nonstrict(v) over wnd as nstrict, + logging_agg_nonstrict_initcond(v) over wnd as nstrict_init, + logging_agg_strict(v::text) over wnd as strict, + logging_agg_strict_initcond(v) over wnd as strict_init +FROM (VALUES + (1, 1, NULL), + (1, 2, 'a'), + (1, 3, 'b'), + (1, 4, NULL), + (1, 5, NULL), + (1, 6, 'c'), + (2, 1, NULL), + (2, 2, 'x'), + (3, 1, 'z') +) AS t(p, i, v) +WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + row | nstrict | nstrict_init | strict | strict_init +----------+-----------------------------------------------+-------------------------------------------------+-----------+---------------- + 1,1:NULL | +NULL | MI+NULL | | MI + 1,2:a | +NULL+'a' | MI+NULL+'a' | a | MI+'a' + 1,3:b | +NULL+'a'-NULL+'b' | MI+NULL+'a'-NULL+'b' | a+'b' | MI+'a'+'b' + 1,4:NULL | +NULL+'a'-NULL+'b'-'a'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL | a+'b'-'a' | MI+'a'+'b'-'a' + 1,5:NULL | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | | MI + 1,6:c | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | c | MI+'c' + 2,1:NULL | +NULL | MI+NULL | | MI + 2,2:x | +NULL+'x' | MI+NULL+'x' | x | MI+'x' + 3,1:z | +'z' | MI+'z' | z | MI+'z' +(9 rows) + +-- and again, but with filter +SELECT + p::text || ',' || i::text || ':' || + CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row, + logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt, + logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt, + logging_agg_strict(v::text) filter(where f) over wnd as strict_filt, + logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt +FROM (VALUES + (1, 1, true, NULL), + (1, 2, false, 'a'), + (1, 3, true, 'b'), + (1, 4, false, NULL), + (1, 5, false, NULL), + (1, 6, false, 'c'), + (2, 1, false, NULL), + (2, 2, true, 'x'), + (3, 1, true, 'z') +) AS t(p, i, f, v) +WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + row | nstrict_filt | nstrict_init_filt | strict_filt | strict_init_filt +----------+--------------+-------------------+-------------+------------------ + 1,1:NULL | +NULL | MI+NULL | | MI + 1,2:- | +NULL | MI+NULL | | MI + 1,3:b | +'b' | MI+'b' | b | MI+'b' + 1,4:- | +'b' | MI+'b' | b | MI+'b' + 1,5:- | | MI | | MI + 1,6:- | | MI | | MI + 2,1:- | | MI | | MI + 2,2:x | +'x' | MI+'x' | x | MI+'x' + 3,1:z | +'z' | MI+'z' | z | MI+'z' +(9 rows) + +-- test that volatile arguments disable moving-aggregate mode +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) + over wnd as inverse, + logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + row | inverse | noinverse +-----+---------------+----------- + 1:a | a | a + 2:b | a+'b' | a*'b' + 3:c | a+'b'-'a'+'c' | b*'c' +(3 rows) + +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) filter(where true) + over wnd as inverse, + logging_agg_strict(v::text) filter(where random() >= 0) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + row | inverse | noinverse +-----+---------------+----------- + 1:a | a | a + 2:b | a+'b' | a*'b' + 3:c | a+'b'-'a'+'c' | b*'c' +(3 rows) + +-- test that non-overlapping windows don't use inverse transitions +SELECT + logging_agg_strict(v::text) OVER wnd +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW) +ORDER BY i; + logging_agg_strict +-------------------- + a + b + c +(3 rows) + +-- test that returning NULL from the inverse transition functions +-- restarts the aggregation from scratch. The second aggregate is supposed +-- to test cases where only some aggregates restart, the third one checks +-- that one aggregate restarting doesn't cause others to restart. +CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS +$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$ +LANGUAGE SQL STRICT; +CREATE AGGREGATE sum_int_randomrestart (int4) +( + stype = int4, + sfunc = int4pl, + mstype = int4, + msfunc = int4pl, + minvfunc = sum_int_randrestart_minvfunc +); +WITH +vs AS ( + SELECT i, (random() * 100)::int4 AS v + FROM generate_series(1, 100) AS i +), +sum_following AS ( + SELECT i, SUM(v) OVER + (ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s + FROM vs +) +SELECT DISTINCT + sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1, + -sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2, + 100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3 +FROM vs +JOIN sum_following ON sum_following.i = vs.i +WINDOW fwd AS ( + ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING +); + eq1 | eq2 | eq3 +-----+-----+----- + t | t | t +(1 row) + diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index c76882a398..2b502aca3e 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -101,3 +101,44 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") rename to test_rank; \da test_* + +-- moving-aggregate options + +CREATE AGGREGATE sumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); + +-- invalid: nonstrict inverse with strict forward function + +CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS +$$ SELECT $1 - $2; $$ +LANGUAGE SQL; + +CREATE AGGREGATE invalidsumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_n +); + +-- invalid: non-matching result types + +CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS +$$ SELECT CAST($1 - $2 AS INT); $$ +LANGUAGE SQL; + +CREATE AGGREGATE wrongreturntype (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_int +); diff --git a/src/test/regress/sql/opr_sanity.sql b/src/test/regress/sql/opr_sanity.sql index ad37178924..22998a553c 100644 --- a/src/test/regress/sql/opr_sanity.sql +++ b/src/test/regress/sql/opr_sanity.sql @@ -592,7 +592,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggkind NOT IN ('n', 'o', 'h') OR aggnumdirectargs < 0 OR (aggkind = 'n' AND aggnumdirectargs > 0) OR - aggtranstype = 0 OR aggtransspace < 0; + aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0; -- Make sure the matching pg_proc entry is sensible, too. @@ -668,6 +668,107 @@ WHERE a.aggfnoid = p.oid AND a.agginitval IS NULL AND NOT binary_coercible(p.proargtypes[0], a.aggtranstype); +-- Check for inconsistent specifications of moving-aggregate columns. + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype != 0 AND + (aggmtransfn = 0 OR aggminvtransfn = 0); + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype = 0 AND + (aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR + aggmtransspace != 0 OR aggminitval IS NOT NULL); + +-- If there is no mfinalfn then the output type must be the mtranstype. + +SELECT a.aggfnoid::oid, p.proname +FROM pg_aggregate as a, pg_proc as p +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn != 0 AND + a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype; + +-- Cross-check mtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + +-- Cross-check minvtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggminvtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + +-- Cross-check mfinalfn (if present) against its entry in pg_proc. + +SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn +WHERE a.aggfnoid = p.oid AND + a.aggmfinalfn = pfn.oid AND + (pfn.proretset OR + NOT binary_coercible(pfn.prorettype, p.prorettype) OR + NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR + CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1 + ELSE pfn.pronargs != p.pronargs + 1 + OR (p.pronargs > 0 AND + NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + END); + +-- If mtransfn is strict then either minitval should be non-NULL, or +-- input type should match mtranstype so that the first non-null input +-- can be assigned as the state value. + +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND ptr.proisstrict AND + a.aggminitval IS NULL AND + NOT binary_coercible(p.proargtypes[0], a.aggmtranstype); + +-- transfn and mtransfn should have same strictness setting. + +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr +WHERE a.aggfnoid = p.oid AND + a.aggtransfn = ptr.oid AND + a.aggmtransfn = mptr.oid AND + ptr.proisstrict != mptr.proisstrict; + -- Cross-check aggsortop (if present) against pg_operator. -- We expect to find entries for bool_and, bool_or, every, max, and min. diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql index 7297e62618..5bae12bd33 100644 --- a/src/test/regress/sql/window.sql +++ b/src/test/regress/sql/window.sql @@ -284,3 +284,195 @@ SELECT nth_value_def(n := 2, val := ten) OVER (PARTITION BY four), ten, four SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four FROM (SELECT * FROM tenk1 WHERE unique2 < 10 ORDER BY four, ten) s; + +-- +-- Test the basic moving-aggregate machinery +-- + +-- create aggregates that record the series of transform calls (these are +-- intentionally not true inverses) + +CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE AGGREGATE logging_agg_nonstrict (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict +); + +CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict, + initcond = 'I', + minitcond = 'MI' +); + +CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '*' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '+' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE AGGREGATE logging_agg_strict (text) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict +); + +CREATE AGGREGATE logging_agg_strict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict, + initcond = 'I', + minitcond = 'MI' +); + +-- test strict and non-strict cases +SELECT + p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row, + logging_agg_nonstrict(v) over wnd as nstrict, + logging_agg_nonstrict_initcond(v) over wnd as nstrict_init, + logging_agg_strict(v::text) over wnd as strict, + logging_agg_strict_initcond(v) over wnd as strict_init +FROM (VALUES + (1, 1, NULL), + (1, 2, 'a'), + (1, 3, 'b'), + (1, 4, NULL), + (1, 5, NULL), + (1, 6, 'c'), + (2, 1, NULL), + (2, 2, 'x'), + (3, 1, 'z') +) AS t(p, i, v) +WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + +-- and again, but with filter +SELECT + p::text || ',' || i::text || ':' || + CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row, + logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt, + logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt, + logging_agg_strict(v::text) filter(where f) over wnd as strict_filt, + logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt +FROM (VALUES + (1, 1, true, NULL), + (1, 2, false, 'a'), + (1, 3, true, 'b'), + (1, 4, false, NULL), + (1, 5, false, NULL), + (1, 6, false, 'c'), + (2, 1, false, NULL), + (2, 2, true, 'x'), + (3, 1, true, 'z') +) AS t(p, i, f, v) +WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + +-- test that volatile arguments disable moving-aggregate mode +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) + over wnd as inverse, + logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) filter(where true) + over wnd as inverse, + logging_agg_strict(v::text) filter(where random() >= 0) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + +-- test that non-overlapping windows don't use inverse transitions +SELECT + logging_agg_strict(v::text) OVER wnd +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW) +ORDER BY i; + +-- test that returning NULL from the inverse transition functions +-- restarts the aggregation from scratch. The second aggregate is supposed +-- to test cases where only some aggregates restart, the third one checks +-- that one aggregate restarting doesn't cause others to restart. + +CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS +$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$ +LANGUAGE SQL STRICT; + +CREATE AGGREGATE sum_int_randomrestart (int4) +( + stype = int4, + sfunc = int4pl, + mstype = int4, + msfunc = int4pl, + minvfunc = sum_int_randrestart_minvfunc +); + +WITH +vs AS ( + SELECT i, (random() * 100)::int4 AS v + FROM generate_series(1, 100) AS i +), +sum_following AS ( + SELECT i, SUM(v) OVER + (ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s + FROM vs +) +SELECT DISTINCT + sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1, + -sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2, + 100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3 +FROM vs +JOIN sum_following ON sum_following.i = vs.i +WINDOW fwd AS ( + ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING +);