From 7012b132d07c2b4ea15b0b3cb1ea9f3278801d98 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 21 Oct 2016 09:54:29 -0400 Subject: [PATCH] postgres_fdw: Push down aggregates to remote servers. Now that the upper planner uses paths, and now that we have proper hooks to inject paths into the upper planning process, it's possible for foreign data wrappers to arrange to push aggregates to the remote side instead of fetching all of the rows and aggregating them locally. This figures to be a massive win for performance, so teach postgres_fdw to do it. Jeevan Chalke and Ashutosh Bapat. Reviewed by Ashutosh Bapat with additional testing by Prabhat Sahu. Various mostly cosmetic changes by me. --- contrib/postgres_fdw/deparse.c | 568 +++++++- .../postgres_fdw/expected/postgres_fdw.out | 1219 ++++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 494 ++++++- contrib/postgres_fdw/postgres_fdw.h | 5 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 305 +++++ src/backend/optimizer/plan/createplan.c | 11 +- 6 files changed, 2452 insertions(+), 150 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 691658f099..8da8c114a8 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -38,6 +38,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/sysattr.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_collation.h" #include "catalog/pg_namespace.h" #include "catalog/pg_operator.h" @@ -56,6 +57,7 @@ #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" +#include "utils/typcache.h" /* @@ -65,6 +67,8 @@ typedef struct foreign_glob_cxt { PlannerInfo *root; /* global planner state */ RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + Relids relids; /* relids of base relations in the underlying + * scan */ } foreign_glob_cxt; /* @@ -94,6 +98,9 @@ typedef struct deparse_expr_cxt { PlannerInfo *root; /* global planner state */ RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + RelOptInfo *scanrel; /* the underlying scan relation. Same as + * foreignrel, when that represents a join or + * a base relation. */ StringInfo buf; /* output buffer to append to */ List **params_list; /* exprs that will become remote Params */ } deparse_expr_cxt; @@ -135,7 +142,7 @@ static void deparseColumnRef(StringInfo buf, int varno, int varattno, static void deparseRelation(StringInfo buf, Relation rel); static void deparseExpr(Expr *expr, deparse_expr_cxt *context); static void deparseVar(Var *node, deparse_expr_cxt *context); -static void deparseConst(Const *node, deparse_expr_cxt *context); +static void deparseConst(Const *node, deparse_expr_cxt *context, int showtype); static void deparseParam(Param *node, deparse_expr_cxt *context); static void deparseArrayRef(ArrayRef *node, deparse_expr_cxt *context); static void deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context); @@ -159,6 +166,14 @@ static void appendOrderByClause(List *pathkeys, deparse_expr_cxt *context); static void appendConditions(List *exprs, deparse_expr_cxt *context); static void deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *joinrel, bool use_alias, List **params_list); +static void deparseFromExpr(List *quals, deparse_expr_cxt *context); +static void deparseAggref(Aggref *node, deparse_expr_cxt *context); +static void appendGroupByClause(List *tlist, deparse_expr_cxt *context); +static void appendAggOrderBy(List *orderList, List *targetList, + deparse_expr_cxt *context); +static void appendFunctionName(Oid funcid, deparse_expr_cxt *context); +static Node *deparseSortGroupClause(Index ref, List *tlist, + deparse_expr_cxt *context); /* @@ -200,6 +215,7 @@ is_foreign_expr(PlannerInfo *root, { foreign_glob_cxt glob_cxt; foreign_loc_cxt loc_cxt; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) (baserel->fdw_private); /* * Check that the expression consists of nodes that are safe to execute @@ -207,6 +223,16 @@ is_foreign_expr(PlannerInfo *root, */ glob_cxt.root = root; glob_cxt.foreignrel = baserel; + + /* + * For an upper relation, use relids from its underneath scan relation, + * because the upperrel's own relids currently aren't set to anything + * meaningful by the core code. For other relation, use their own relids. + */ + if (baserel->reloptkind == RELOPT_UPPER_REL) + glob_cxt.relids = fpinfo->outerrel->relids; + else + glob_cxt.relids = baserel->relids; loc_cxt.collation = InvalidOid; loc_cxt.state = FDW_COLLATE_NONE; if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt)) @@ -281,7 +307,7 @@ foreign_expr_walker(Node *node, * Param's collation, ie it's not safe for it to have a * non-default collation. */ - if (bms_is_member(var->varno, glob_cxt->foreignrel->relids) && + if (bms_is_member(var->varno, glob_cxt->relids) && var->varlevelsup == 0) { /* Var belongs to foreign table */ @@ -631,6 +657,106 @@ foreign_expr_walker(Node *node, check_type = false; } break; + case T_Aggref: + { + Aggref *agg = (Aggref *) node; + ListCell *lc; + + /* Not safe to pushdown when not in grouping context */ + if (glob_cxt->foreignrel->reloptkind != RELOPT_UPPER_REL) + return false; + + /* Only non-split aggregates are pushable. */ + if (agg->aggsplit != AGGSPLIT_SIMPLE) + return false; + + /* As usual, it must be shippable. */ + if (!is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo)) + return false; + + /* + * Recurse to input args. aggdirectargs, aggorder and + * aggdistinct are all present in args, so no need to check + * their shippability explicitly. + */ + foreach(lc, agg->args) + { + Node *n = (Node *) lfirst(lc); + + /* If TargetEntry, extract the expression from it */ + if (IsA(n, TargetEntry)) + { + TargetEntry *tle = (TargetEntry *) n; + + n = (Node *) tle->expr; + } + + if (!foreign_expr_walker(n, glob_cxt, &inner_cxt)) + return false; + } + + /* + * For aggorder elements, check whether the sort operator, if + * specified, is shippable or not. + */ + if (agg->aggorder) + { + ListCell *lc; + + foreach(lc, agg->aggorder) + { + SortGroupClause *srt = (SortGroupClause *) lfirst(lc); + Oid sortcoltype; + TypeCacheEntry *typentry; + TargetEntry *tle; + + tle = get_sortgroupref_tle(srt->tleSortGroupRef, + agg->args); + sortcoltype = exprType((Node *) tle->expr); + typentry = lookup_type_cache(sortcoltype, + TYPECACHE_LT_OPR | TYPECACHE_GT_OPR); + /* Check shippability of non-default sort operator. */ + if (srt->sortop != typentry->lt_opr && + srt->sortop != typentry->gt_opr && + !is_shippable(srt->sortop, OperatorRelationId, + fpinfo)) + return false; + } + } + + /* Check aggregate filter */ + if (!foreign_expr_walker((Node *) agg->aggfilter, + glob_cxt, &inner_cxt)) + return false; + + /* + * If aggregate's input collation is not derived from a + * foreign Var, it can't be sent to remote. + */ + if (agg->inputcollid == InvalidOid) + /* OK, inputs are all noncollatable */ ; + else if (inner_cxt.state != FDW_COLLATE_SAFE || + agg->inputcollid != inner_cxt.collation) + return false; + + /* + * Detect whether node is introducing a collation not derived + * from a foreign Var. (If so, we just mark it unsafe for now + * rather than immediately returning false, since the parent + * node might not care.) + */ + collation = agg->aggcollid; + if (collation == InvalidOid) + state = FDW_COLLATE_NONE; + else if (inner_cxt.state == FDW_COLLATE_SAFE && + collation == inner_cxt.collation) + state = FDW_COLLATE_SAFE; + else if (collation == DEFAULT_COLLATION_OID) + state = FDW_COLLATE_NONE; + else + state = FDW_COLLATE_UNSAFE; + } + break; default: /* @@ -720,7 +846,9 @@ deparse_type_name(Oid type_oid, int32 typemod) * Build the targetlist for given relation to be deparsed as SELECT clause. * * The output targetlist contains the columns that need to be fetched from the - * foreign server for the given relation. + * foreign server for the given relation. If foreignrel is an upper relation, + * then the output targetlist can also contains expressions to be evaluated on + * foreign server. */ List * build_tlist_to_deparse(RelOptInfo *foreignrel) @@ -728,6 +856,13 @@ build_tlist_to_deparse(RelOptInfo *foreignrel) List *tlist = NIL; PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + /* + * For an upper relation, we have already built the target list while + * checking shippability, so just return that. + */ + if (foreignrel->reloptkind == RELOPT_UPPER_REL) + return fpinfo->grouped_tlist; + /* * We require columns specified in foreignrel->reltarget->exprs and those * required for evaluating the local conditions. @@ -749,7 +884,8 @@ build_tlist_to_deparse(RelOptInfo *foreignrel) * For a base relation fpinfo->attrs_used is used to construct SELECT clause, * hence the tlist is ignored for a base relation. * - * remote_conds is the list of conditions to be deparsed as WHERE clause. + * remote_conds is the list of conditions to be deparsed into the WHERE clause + * (or, in the case of upper relations, into the HAVING clause). * * If params_list is not NULL, it receives a list of Params and other-relation * Vars used in the clauses; these values must be transmitted to the remote @@ -768,28 +904,58 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List **retrieved_attrs, List **params_list) { deparse_expr_cxt context; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + List *quals; - /* We handle relations for foreign tables and joins between those */ + /* + * We handle relations for foreign tables, joins between those and upper + * relations. + */ Assert(rel->reloptkind == RELOPT_JOINREL || rel->reloptkind == RELOPT_BASEREL || - rel->reloptkind == RELOPT_OTHER_MEMBER_REL); + rel->reloptkind == RELOPT_OTHER_MEMBER_REL || + rel->reloptkind == RELOPT_UPPER_REL); - /* Fill portions of context common to join and base relation */ + /* Fill portions of context common to upper, join and base relation */ context.buf = buf; context.root = root; context.foreignrel = rel; + context.scanrel = (rel->reloptkind == RELOPT_UPPER_REL) ? + fpinfo->outerrel : rel; context.params_list = params_list; - /* Construct SELECT clause and FROM clause */ + /* Construct SELECT clause */ deparseSelectSql(tlist, retrieved_attrs, &context); /* - * Construct WHERE clause + * For upper relations, the WHERE clause is built from the remote + * conditions of the underlying scan relation; otherwise, we can use the + * supplied list of remote conditions directly. */ - if (remote_conds) + if (rel->reloptkind == RELOPT_UPPER_REL) { - appendStringInfo(buf, " WHERE "); - appendConditions(remote_conds, &context); + PgFdwRelationInfo *ofpinfo; + + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + quals = ofpinfo->remote_conds; + } + else + quals = remote_conds; + + /* Construct FROM and WHERE clauses */ + deparseFromExpr(quals, &context); + + if (rel->reloptkind == RELOPT_UPPER_REL) + { + /* Append GROUP BY clause */ + appendGroupByClause(tlist, &context); + + /* Append HAVING clause */ + if (remote_conds) + { + appendStringInfo(buf, " HAVING "); + appendConditions(remote_conds, &context); + } } /* Add ORDER BY clause if we found any useful pathkeys */ @@ -803,7 +969,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, /* * Construct a simple SELECT statement that retrieves desired columns * of the specified foreign table, and append it to "buf". The output - * contains just "SELECT ... FROM ....". + * contains just "SELECT ... ". * * We also create an integer List of the columns being retrieved, which is * returned to *retrieved_attrs. @@ -824,7 +990,8 @@ deparseSelectSql(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context) */ appendStringInfoString(buf, "SELECT "); - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) { /* For a join relation use the input tlist */ deparseExplicitTargetList(tlist, retrieved_attrs, context); @@ -847,14 +1014,38 @@ deparseSelectSql(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context) fpinfo->attrs_used, false, retrieved_attrs); heap_close(rel, NoLock); } +} - /* - * Construct FROM clause - */ +/* + * Construct a FROM clause and, if needed, a WHERE clause, and append those to + * "buf". + * + * quals is the list of clauses to be included in the WHERE clause. + */ +static void +deparseFromExpr(List *quals, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + RelOptInfo *foreignrel = context->foreignrel; + RelOptInfo *scanrel = context->scanrel; + + /* For upper relations, scanrel must be either a joinrel or a baserel */ + Assert(foreignrel->reloptkind != RELOPT_UPPER_REL || + scanrel->reloptkind == RELOPT_JOINREL || + scanrel->reloptkind == RELOPT_BASEREL); + + /* Construct FROM clause */ appendStringInfoString(buf, " FROM "); - deparseFromExprForRel(buf, root, foreignrel, - (foreignrel->reloptkind == RELOPT_JOINREL), + deparseFromExprForRel(buf, context->root, scanrel, + (bms_num_members(scanrel->relids) > 1), context->params_list); + + /* Construct WHERE clause */ + if (quals != NIL) + { + appendStringInfo(buf, " WHERE "); + appendConditions(quals, context); + } } /* @@ -957,14 +1148,14 @@ deparseTargetList(StringInfo buf, /* * Deparse the appropriate locking clause (FOR UPDATE or FOR SHARE) for a - * given relation (context->foreignrel). + * given relation (context->scanrel). */ static void deparseLockingClause(deparse_expr_cxt *context) { StringInfo buf = context->buf; PlannerInfo *root = context->root; - RelOptInfo *rel = context->foreignrel; + RelOptInfo *rel = context->scanrel; int relid = -1; while ((relid = bms_next_member(rel->relids, relid)) >= 0) @@ -1024,7 +1215,7 @@ deparseLockingClause(deparse_expr_cxt *context) } /* Add the relation alias if we are here for a join relation */ - if (rel->reloptkind == RELOPT_JOINREL && + if (bms_num_members(rel->relids) > 1 && rc->strength != LCS_NONE) appendStringInfo(buf, " OF %s%d", REL_ALIAS_PREFIX, relid); } @@ -1036,7 +1227,7 @@ deparseLockingClause(deparse_expr_cxt *context) * Deparse conditions from the provided list and append them to buf. * * The conditions in the list are assumed to be ANDed. This function is used to - * deparse both WHERE clauses and JOIN .. ON clauses. + * deparse WHERE clauses, JOIN .. ON clauses and HAVING clauses. */ static void appendConditions(List *exprs, deparse_expr_cxt *context) @@ -1126,22 +1317,15 @@ deparseExplicitTargetList(List *tlist, List **retrieved_attrs, foreach(lc, tlist) { TargetEntry *tle = (TargetEntry *) lfirst(lc); - Var *var; /* Extract expression if TargetEntry node */ Assert(IsA(tle, TargetEntry)); - var = (Var *) tle->expr; - - /* We expect only Var nodes here */ - if (!IsA(var, Var)) - elog(ERROR, "non-Var not expected in target list"); if (i > 0) appendStringInfoString(buf, ", "); - deparseVar(var, context); + deparseExpr((Expr *) tle->expr, context); *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); - i++; } @@ -1192,6 +1376,7 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, context.buf = buf; context.foreignrel = foreignrel; + context.scanrel = foreignrel; context.root = root; context.params_list = params_list; @@ -1360,6 +1545,7 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root, /* Set up context struct for recursion */ context.root = root; context.foreignrel = baserel; + context.scanrel = baserel; context.buf = buf; context.params_list = params_list; @@ -1444,6 +1630,7 @@ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root, /* Set up context struct for recursion */ context.root = root; context.foreignrel = baserel; + context.scanrel = baserel; context.buf = buf; context.params_list = params_list; @@ -1817,7 +2004,7 @@ deparseExpr(Expr *node, deparse_expr_cxt *context) deparseVar((Var *) node, context); break; case T_Const: - deparseConst((Const *) node, context); + deparseConst((Const *) node, context, 0); break; case T_Param: deparseParam((Param *) node, context); @@ -1849,6 +2036,9 @@ deparseExpr(Expr *node, deparse_expr_cxt *context) case T_ArrayExpr: deparseArrayExpr((ArrayExpr *) node, context); break; + case T_Aggref: + deparseAggref((Aggref *) node, context); + break; default: elog(ERROR, "unsupported expression type for deparse: %d", (int) nodeTag(node)); @@ -1867,10 +2057,12 @@ deparseExpr(Expr *node, deparse_expr_cxt *context) static void deparseVar(Var *node, deparse_expr_cxt *context) { - bool qualify_col = (context->foreignrel->reloptkind == RELOPT_JOINREL); + Relids relids = context->scanrel->relids; - if (bms_is_member(node->varno, context->foreignrel->relids) && - node->varlevelsup == 0) + /* Qualify columns when multiple relations are involved. */ + bool qualify_col = (bms_num_members(relids) > 1); + + if (bms_is_member(node->varno, relids) && node->varlevelsup == 0) deparseColumnRef(context->buf, node->varno, node->varattno, context->root, qualify_col); else @@ -1908,9 +2100,12 @@ deparseVar(Var *node, deparse_expr_cxt *context) * Deparse given constant value into context->buf. * * This function has to be kept in sync with ruleutils.c's get_const_expr. + * As for that function, showtype can be -1 to never show "::typename" decoration, + * or +1 to always show it, or 0 to show it only if the constant wouldn't be assumed + * to be the right type by default. */ static void -deparseConst(Const *node, deparse_expr_cxt *context) +deparseConst(Const *node, deparse_expr_cxt *context, int showtype) { StringInfo buf = context->buf; Oid typoutput; @@ -1922,9 +2117,10 @@ deparseConst(Const *node, deparse_expr_cxt *context) if (node->constisnull) { appendStringInfoString(buf, "NULL"); - appendStringInfo(buf, "::%s", - deparse_type_name(node->consttype, - node->consttypmod)); + if (showtype >= 0) + appendStringInfo(buf, "::%s", + deparse_type_name(node->consttype, + node->consttypmod)); return; } @@ -1974,9 +2170,14 @@ deparseConst(Const *node, deparse_expr_cxt *context) break; } + pfree(extval); + + if (showtype < 0) + return; + /* - * Append ::typename unless the constant will be implicitly typed as the - * right type when it is read in. + * For showtype == 0, append ::typename unless the constant will be + * implicitly typed as the right type when it is read in. * * XXX this code has to be kept in sync with the behavior of the parser, * especially make_const. @@ -1995,7 +2196,7 @@ deparseConst(Const *node, deparse_expr_cxt *context) needlabel = true; break; } - if (needlabel) + if (needlabel || showtype > 0) appendStringInfo(buf, "::%s", deparse_type_name(node->consttype, node->consttypmod)); @@ -2092,9 +2293,6 @@ static void deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context) { StringInfo buf = context->buf; - HeapTuple proctup; - Form_pg_proc procform; - const char *proname; bool use_variadic; bool first; ListCell *arg; @@ -2127,29 +2325,15 @@ deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context) return; } - /* - * Normal function: display as proname(args). - */ - proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(node->funcid)); - if (!HeapTupleIsValid(proctup)) - elog(ERROR, "cache lookup failed for function %u", node->funcid); - procform = (Form_pg_proc) GETSTRUCT(proctup); - /* Check if need to print VARIADIC (cf. ruleutils.c) */ use_variadic = node->funcvariadic; - /* Print schema name only if it's not pg_catalog */ - if (procform->pronamespace != PG_CATALOG_NAMESPACE) - { - const char *schemaname; + /* + * Normal function: display as proname(args). + */ + appendFunctionName(node->funcid, context); + appendStringInfoChar(buf, '('); - schemaname = get_namespace_name(procform->pronamespace); - appendStringInfo(buf, "%s.", quote_identifier(schemaname)); - } - - /* Deparse the function name ... */ - proname = NameStr(procform->proname); - appendStringInfo(buf, "%s(", quote_identifier(proname)); /* ... and all the arguments */ first = true; foreach(arg, node->args) @@ -2162,8 +2346,6 @@ deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context) first = false; } appendStringInfoChar(buf, ')'); - - ReleaseSysCache(proctup); } /* @@ -2419,6 +2601,152 @@ deparseArrayExpr(ArrayExpr *node, deparse_expr_cxt *context) deparse_type_name(node->array_typeid, -1)); } +/* + * Deparse an Aggref node. + */ +static void +deparseAggref(Aggref *node, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + bool use_variadic; + + /* Only basic, non-split aggregation accepted. */ + Assert(node->aggsplit == AGGSPLIT_SIMPLE); + + /* Check if need to print VARIADIC (cf. ruleutils.c) */ + use_variadic = node->aggvariadic; + + /* Find aggregate name from aggfnoid which is a pg_proc entry */ + appendFunctionName(node->aggfnoid, context); + appendStringInfoChar(buf, '('); + + /* Add DISTINCT */ + appendStringInfo(buf, "%s", (node->aggdistinct != NIL) ? "DISTINCT " : ""); + + if (AGGKIND_IS_ORDERED_SET(node->aggkind)) + { + /* Add WITHIN GROUP (ORDER BY ..) */ + ListCell *arg; + bool first = true; + + Assert(!node->aggvariadic); + Assert(node->aggorder != NIL); + + foreach(arg, node->aggdirectargs) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseExpr((Expr *) lfirst(arg), context); + } + + appendStringInfoString(buf, ") WITHIN GROUP (ORDER BY "); + appendAggOrderBy(node->aggorder, node->args, context); + } + else + { + /* aggstar can be set only in zero-argument aggregates */ + if (node->aggstar) + appendStringInfoChar(buf, '*'); + else + { + ListCell *arg; + bool first = true; + + /* Add all the arguments */ + foreach(arg, node->args) + { + TargetEntry *tle = (TargetEntry *) lfirst(arg); + Node *n = (Node *) tle->expr; + + if (tle->resjunk) + continue; + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + /* Add VARIADIC */ + if (use_variadic && lnext(arg) == NULL) + appendStringInfoString(buf, "VARIADIC "); + + deparseExpr((Expr *) n, context); + } + } + + /* Add ORDER BY */ + if (node->aggorder != NIL) + { + appendStringInfoString(buf, " ORDER BY "); + appendAggOrderBy(node->aggorder, node->args, context); + } + } + + /* Add FILTER (WHERE ..) */ + if (node->aggfilter != NULL) + { + appendStringInfoString(buf, ") FILTER (WHERE "); + deparseExpr((Expr *) node->aggfilter, context); + } + + appendStringInfoChar(buf, ')'); +} + +/* + * Append ORDER BY within aggregate function. + */ +static void +appendAggOrderBy(List *orderList, List *targetList, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + ListCell *lc; + bool first = true; + + foreach(lc, orderList) + { + SortGroupClause *srt = (SortGroupClause *) lfirst(lc); + Node *sortexpr; + Oid sortcoltype; + TypeCacheEntry *typentry; + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + sortexpr = deparseSortGroupClause(srt->tleSortGroupRef, targetList, + context); + sortcoltype = exprType(sortexpr); + /* See whether operator is default < or > for datatype */ + typentry = lookup_type_cache(sortcoltype, + TYPECACHE_LT_OPR | TYPECACHE_GT_OPR); + if (srt->sortop == typentry->lt_opr) + appendStringInfoString(buf, " ASC"); + else if (srt->sortop == typentry->gt_opr) + appendStringInfoString(buf, " DESC"); + else + { + HeapTuple opertup; + Form_pg_operator operform; + + appendStringInfoString(buf, " USING "); + + /* Append operator name. */ + opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(srt->sortop)); + if (!HeapTupleIsValid(opertup)) + elog(ERROR, "cache lookup failed for operator %u", srt->sortop); + operform = (Form_pg_operator) GETSTRUCT(opertup); + deparseOperatorName(buf, operform); + ReleaseSysCache(opertup); + } + + if (srt->nulls_first) + appendStringInfoString(buf, " NULLS FIRST"); + else + appendStringInfoString(buf, " NULLS LAST"); + } +} + /* * Print the representation of a parameter to be sent to the remote side. * @@ -2463,6 +2791,41 @@ printRemotePlaceholder(Oid paramtype, int32 paramtypmod, appendStringInfo(buf, "((SELECT null::%s)::%s)", ptypename, ptypename); } +/* + * Deparse GROUP BY clause. + */ +static void +appendGroupByClause(List *tlist, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + Query *query = context->root->parse; + ListCell *lc; + bool first = true; + + /* Nothing to be done, if there's no GROUP BY clause in the query. */ + if (!query->groupClause) + return; + + appendStringInfo(buf, " GROUP BY "); + + /* + * Queries with grouping sets are not pushed down, so we don't expect + * grouping sets here. + */ + Assert(!query->groupingSets); + + foreach(lc, query->groupClause) + { + SortGroupClause *grp = (SortGroupClause *) lfirst(lc); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseSortGroupClause(grp->tleSortGroupRef, tlist, context); + } +} + /* * Deparse ORDER BY clause according to the given pathkeys for given base * relation. From given pathkeys expressions belonging entirely to the given @@ -2474,7 +2837,7 @@ appendOrderByClause(List *pathkeys, deparse_expr_cxt *context) ListCell *lcell; int nestlevel; char *delim = " "; - RelOptInfo *baserel = context->foreignrel; + RelOptInfo *baserel = context->scanrel; StringInfo buf = context->buf; /* Make sure any constants in the exprs are printed portably */ @@ -2505,3 +2868,74 @@ appendOrderByClause(List *pathkeys, deparse_expr_cxt *context) } reset_transmission_modes(nestlevel); } + +/* + * appendFunctionName + * Deparses function name from given function oid. + */ +static void +appendFunctionName(Oid funcid, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + HeapTuple proctup; + Form_pg_proc procform; + const char *proname; + + proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); + if (!HeapTupleIsValid(proctup)) + elog(ERROR, "cache lookup failed for function %u", funcid); + procform = (Form_pg_proc) GETSTRUCT(proctup); + + /* Print schema name only if it's not pg_catalog */ + if (procform->pronamespace != PG_CATALOG_NAMESPACE) + { + const char *schemaname; + + schemaname = get_namespace_name(procform->pronamespace); + appendStringInfo(buf, "%s.", quote_identifier(schemaname)); + } + + /* Always print the function name */ + proname = NameStr(procform->proname); + appendStringInfo(buf, "%s", quote_identifier(proname)); + + ReleaseSysCache(proctup); +} + +/* + * Appends a sort or group clause. + * + * Like get_rule_sortgroupclause(), returns the expression tree, so caller + * need not find it again. + */ +static Node * +deparseSortGroupClause(Index ref, List *tlist, deparse_expr_cxt *context) +{ + StringInfo buf = context->buf; + TargetEntry *tle; + Expr *expr; + + tle = get_sortgroupref_tle(ref, tlist); + expr = tle->expr; + + if (expr && IsA(expr, Const)) + { + /* + * Force a typecast here so that we don't emit something like "GROUP + * BY 2", which will be misconstrued as a column position rather than + * a constant. + */ + deparseConst((Const *) expr, context, 1); + } + else if (!expr || IsA(expr, Var)) + deparseExpr(expr, context); + else + { + /* Always parenthesize the expression. */ + appendStringInfoString(buf, "("); + deparseExpr(expr, context); + appendStringInfoString(buf, ")"); + } + + return (Node *) expr; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index d97e694d1a..d3f37adad6 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -861,14 +861,13 @@ CREATE OPERATOR === ( -- built-in operators and functions can be shipped for remote execution EXPLAIN (VERBOSE, COSTS OFF) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = abs(t1.c2); - QUERY PLAN --------------------------------------------------------------------------- - Aggregate - Output: count(c3) - -> Foreign Scan on public.ft1 t1 - Output: c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = abs(c2))) -(5 rows) + QUERY PLAN +--------------------------------------------------------------------------- + Foreign Scan + Output: (count(c3)) + Relations: Aggregate on (public.ft1 t1) + Remote SQL: SELECT count(c3) FROM "S 1"."T 1" WHERE (("C 1" = abs(c2))) +(4 rows) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = abs(t1.c2); count @@ -878,14 +877,13 @@ SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = abs(t1.c2); EXPLAIN (VERBOSE, COSTS OFF) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = t1.c2; - QUERY PLAN ---------------------------------------------------------------------- - Aggregate - Output: count(c3) - -> Foreign Scan on public.ft1 t1 - Output: c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = c2)) -(5 rows) + QUERY PLAN +---------------------------------------------------------------------- + Foreign Scan + Output: (count(c3)) + Relations: Aggregate on (public.ft1 t1) + Remote SQL: SELECT count(c3) FROM "S 1"."T 1" WHERE (("C 1" = c2)) +(4 rows) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = t1.c2; count @@ -937,14 +935,13 @@ ALTER SERVER loopback OPTIONS (ADD extensions 'postgres_fdw'); -- ... now they can be shipped EXPLAIN (VERBOSE, COSTS OFF) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = postgres_fdw_abs(t1.c2); - QUERY PLAN ----------------------------------------------------------------------------------------------- - Aggregate - Output: count(c3) - -> Foreign Scan on public.ft1 t1 - Output: c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = public.postgres_fdw_abs(c2))) -(5 rows) + QUERY PLAN +----------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(c3)) + Relations: Aggregate on (public.ft1 t1) + Remote SQL: SELECT count(c3) FROM "S 1"."T 1" WHERE (("C 1" = public.postgres_fdw_abs(c2))) +(4 rows) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = postgres_fdw_abs(t1.c2); count @@ -954,14 +951,13 @@ SELECT count(c3) FROM ft1 t1 WHERE t1.c1 = postgres_fdw_abs(t1.c2); EXPLAIN (VERBOSE, COSTS OFF) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 === t1.c2; - QUERY PLAN ----------------------------------------------------------------------------------------- - Aggregate - Output: count(c3) - -> Foreign Scan on public.ft1 t1 - Output: c3 - Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(public.===) c2)) -(5 rows) + QUERY PLAN +----------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(c3)) + Relations: Aggregate on (public.ft1 t1) + Remote SQL: SELECT count(c3) FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(public.===) c2)) +(4 rows) SELECT count(c3) FROM ft1 t1 WHERE t1.c1 === t1.c2; count @@ -2282,6 +2278,1138 @@ ALTER VIEW v4 OWNER TO regress_view_owner; -- cleanup DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; +-- =================================================================== +-- Aggregate and grouping queries +-- =================================================================== +-- Simple aggregates +explain (verbose, costs off) +select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------- + Result + Output: (count(c6)), (sum(c1)), (avg(c1)), (min(c2)), (max(c1)), (stddev(c2)), ((sum(c1)) * ((random() <= '1'::double precision))::integer), c2 + -> Sort + Output: (count(c6)), (sum(c1)), (avg(c1)), (min(c2)), (max(c1)), (stddev(c2)), c2 + Sort Key: (count(ft1.c6)), (sum(ft1.c1)) + -> Foreign Scan + Output: (count(c6)), (sum(c1)), (avg(c1)), (min(c2)), (max(c1)), (stddev(c2)), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT count(c6), sum("C 1"), avg("C 1"), min(c2), max("C 1"), stddev(c2), c2 FROM "S 1"."T 1" WHERE ((c2 < 5)) GROUP BY c2 +(9 rows) + +select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2; + count | sum | avg | min | max | stddev | sum2 +-------+-------+----------------------+-----+------+--------+------- + 100 | 49600 | 496.0000000000000000 | 1 | 991 | 0 | 49600 + 100 | 49700 | 497.0000000000000000 | 2 | 992 | 0 | 49700 + 100 | 49800 | 498.0000000000000000 | 3 | 993 | 0 | 49800 + 100 | 49900 | 499.0000000000000000 | 4 | 994 | 0 | 49900 + 100 | 50500 | 505.0000000000000000 | 0 | 1000 | 0 | 50500 +(5 rows) + +-- Aggregate is not pushed down as aggregation contains random() +explain (verbose, costs off) +select sum(c1 * (random() <= 1)::int) as sum, avg(c1) from ft1; + QUERY PLAN +------------------------------------------------------------------------------- + Aggregate + Output: sum((c1 * ((random() <= '1'::double precision))::integer)), avg(c1) + -> Foreign Scan on public.ft1 + Output: c1 + Remote SQL: SELECT "C 1" FROM "S 1"."T 1" +(5 rows) + +-- Aggregate over join query +explain (verbose, costs off) +select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)), (sum(t1.c1)), (avg(t2.c1)) + Relations: Aggregate on ((public.ft1 t1) INNER JOIN (public.ft1 t2)) + Remote SQL: SELECT count(*), sum(r1."C 1"), avg(r2."C 1") FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r2.c2 = 6)) AND ((r1.c2 = 6)))) +(4 rows) + +select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6; + count | sum | avg +-------+---------+---------------------- + 10000 | 5010000 | 501.0000000000000000 +(1 row) + +-- Not pushed down due to local conditions present in underneath input rel +explain (verbose, costs off) +select sum(t1.c1), count(t2.c1) from ft1 t1 inner join ft2 t2 on (t1.c1 = t2.c1) where ((t1.c1 * t2.c1)/(t1.c1 * t2.c1)) * random() <= 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------- + Aggregate + Output: sum(t1.c1), count(t2.c1) + -> Foreign Scan + Output: t1.c1, t2.c1 + Filter: (((((t1.c1 * t2.c1) / (t1.c1 * t2.c1)))::double precision * random()) <= '1'::double precision) + Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2) + Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")))) +(7 rows) + +-- GROUP BY clause having expressions +explain (verbose, costs off) +select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2; + QUERY PLAN +------------------------------------------------------------------------------------------------ + Sort + Output: ((c2 / 2)), ((sum(c2) * (c2 / 2))) + Sort Key: ((ft1.c2 / 2)) + -> Foreign Scan + Output: ((c2 / 2)), ((sum(c2) * (c2 / 2))) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT (c2 / 2), (sum(c2) * (c2 / 2)) FROM "S 1"."T 1" GROUP BY ((c2 / 2)) +(7 rows) + +select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2; + ?column? | ?column? +----------+---------- + 0 | 0 + 1 | 500 + 2 | 1800 + 3 | 3900 + 4 | 6800 +(5 rows) + +-- Aggregates in subquery are pushed down. +explain (verbose, costs off) +select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x; + QUERY PLAN +---------------------------------------------------------------------------------------------------------- + Aggregate + Output: count(ft1.c2), sum(ft1.c2) + -> Sort + Output: ft1.c2, (sum(ft1.c1)), (sqrt((ft1.c1)::double precision)) + Sort Key: ft1.c2, (sum(ft1.c1)) + -> Foreign Scan + Output: ft1.c2, (sum(ft1.c1)), (sqrt((ft1.c1)::double precision)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, sum("C 1"), sqrt("C 1") FROM "S 1"."T 1" GROUP BY c2, (sqrt("C 1")) +(9 rows) + +select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x; + count | sum +-------+------ + 1000 | 4500 +(1 row) + +-- Aggregate is still pushed down by taking unshippable expression out +explain (verbose, costs off) +select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Sort + Output: ((c2 * ((random() <= '1'::double precision))::integer)), ((sum(c1) * c2)), c2 + Sort Key: ((ft1.c2 * ((random() <= '1'::double precision))::integer)), ((sum(ft1.c1) * ft1.c2)) + -> Foreign Scan + Output: (c2 * ((random() <= '1'::double precision))::integer), ((sum(c1) * c2)), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT (sum("C 1") * c2), c2 FROM "S 1"."T 1" GROUP BY c2 +(7 rows) + +select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2; + sum1 | sum2 +------+-------- + 0 | 0 + 1 | 49600 + 2 | 99400 + 3 | 149400 + 4 | 199600 + 5 | 250000 + 6 | 300600 + 7 | 351400 + 8 | 402400 + 9 | 453600 +(10 rows) + +-- Aggregate with unshippable GROUP BY clause are not pushed +explain (verbose, costs off) +select c2 * (random() <= 1)::int as c2 from ft2 group by c2 * (random() <= 1)::int order by 1; + QUERY PLAN +------------------------------------------------------------------------------ + Sort + Output: ((c2 * ((random() <= '1'::double precision))::integer)) + Sort Key: ((ft2.c2 * ((random() <= '1'::double precision))::integer)) + -> HashAggregate + Output: ((c2 * ((random() <= '1'::double precision))::integer)) + Group Key: (ft2.c2 * ((random() <= '1'::double precision))::integer) + -> Foreign Scan on public.ft2 + Output: (c2 * ((random() <= '1'::double precision))::integer) + Remote SQL: SELECT c2 FROM "S 1"."T 1" +(9 rows) + +-- GROUP BY clause in various forms, cardinal, alias and constant expression +explain (verbose, costs off) +select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2; + QUERY PLAN +---------------------------------------------------------------------------------------------------------- + Sort + Output: (count(c2)), c2, (5), (7.0), (9) + Sort Key: ft1.c2 + -> Foreign Scan + Output: (count(c2)), c2, (5), (7.0), (9) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT count(c2), c2, 5, 7.0, 9 FROM "S 1"."T 1" GROUP BY c2, 5::integer, 9::integer +(7 rows) + +select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2; + w | x | y | z +-----+---+---+----- + 100 | 0 | 5 | 7.0 + 100 | 1 | 5 | 7.0 + 100 | 2 | 5 | 7.0 + 100 | 3 | 5 | 7.0 + 100 | 4 | 5 | 7.0 + 100 | 5 | 5 | 7.0 + 100 | 6 | 5 | 7.0 + 100 | 7 | 5 | 7.0 + 100 | 8 | 5 | 7.0 + 100 | 9 | 5 | 7.0 +(10 rows) + +-- Testing HAVING clause shippability +explain (verbose, costs off) +select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: c2, (sum(c1)) + Sort Key: ft2.c2 + -> Foreign Scan + Output: c2, (sum(c1)) + Relations: Aggregate on (public.ft2) + Remote SQL: SELECT c2, sum("C 1") FROM "S 1"."T 1" GROUP BY c2 HAVING ((avg("C 1") < 500::numeric)) AND ((sum("C 1") < 49800)) +(7 rows) + +select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2; + c2 | sum +----+------- + 1 | 49600 + 2 | 49700 +(2 rows) + +-- Using expressions in HAVING clause +explain (verbose, costs off) +select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: c5, (count(c2)), (sqrt((c2)::double precision)) + Sort Key: ft1.c5, (count(ft1.c2)) + -> Foreign Scan + Output: c5, (count(c2)), (sqrt((c2)::double precision)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c5, count(c2), sqrt(c2) FROM "S 1"."T 1" GROUP BY c5, (sqrt(c2)) HAVING ((sqrt(max(c2)) = 1.41421356237309515::double precision)) +(7 rows) + +select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2; + c5 | count +--------------------------+------- + Sat Jan 03 00:00:00 1970 | 10 + Tue Jan 13 00:00:00 1970 | 10 + Fri Jan 23 00:00:00 1970 | 10 + Mon Feb 02 00:00:00 1970 | 10 + Thu Feb 12 00:00:00 1970 | 10 + Sun Feb 22 00:00:00 1970 | 10 + Wed Mar 04 00:00:00 1970 | 10 + Sat Mar 14 00:00:00 1970 | 10 + Tue Mar 24 00:00:00 1970 | 10 + Fri Apr 03 00:00:00 1970 | 10 +(10 rows) + +-- Unshippable HAVING clause will be evaluated locally, and other qual in HAVING clause is pushed down +explain (verbose, costs off) +select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate + Output: count(*) + -> Foreign Scan + Output: ft1.c5, (NULL::bigint), (sqrt((ft1.c2)::double precision)) + Filter: (((((avg(ft1.c1)) / (avg(ft1.c1))))::double precision * random()) <= '1'::double precision) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c5, NULL::bigint, sqrt(c2), avg("C 1") FROM "S 1"."T 1" GROUP BY c5, (sqrt(c2)) HAVING ((avg("C 1") < 500::numeric)) +(7 rows) + +select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x; + count +------- + 49 +(1 row) + +-- Aggregate in HAVING clause is not pushable, and thus aggregation is not pushed down +explain (verbose, costs off) +select sum(c1) from ft1 group by c2 having avg(c1 * (random() <= 1)::int) > 100 order by 1; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Sort + Output: (sum(c1)), c2 + Sort Key: (sum(ft1.c1)) + -> HashAggregate + Output: sum(c1), c2 + Group Key: ft1.c2 + Filter: (avg((ft1.c1 * ((random() <= '1'::double precision))::integer)) > '100'::numeric) + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" +(10 rows) + +-- Testing ORDER BY, DISTINCT, FILTER, Ordered-sets and VARIADIC within aggregates +-- ORDER BY within aggregate, same column used to order +explain (verbose, costs off) +select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: (array_agg(c1 ORDER BY c1)), c2 + Sort Key: (array_agg(ft1.c1 ORDER BY ft1.c1)) + -> Foreign Scan + Output: (array_agg(c1 ORDER BY c1)), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT array_agg("C 1" ORDER BY "C 1" ASC NULLS LAST), c2 FROM "S 1"."T 1" WHERE (("C 1" < 100)) GROUP BY c2 +(7 rows) + +select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1; + array_agg +-------------------------------- + {1,11,21,31,41,51,61,71,81,91} + {2,12,22,32,42,52,62,72,82,92} + {3,13,23,33,43,53,63,73,83,93} + {4,14,24,34,44,54,64,74,84,94} + {5,15,25,35,45,55,65,75,85,95} + {6,16,26,36,46,56,66,76,86,96} + {7,17,27,37,47,57,67,77,87,97} + {8,18,28,38,48,58,68,78,88,98} + {9,19,29,39,49,59,69,79,89,99} + {10,20,30,40,50,60,70,80,90} +(10 rows) + +-- ORDER BY within aggregate, different column used to order also using DESC +explain (verbose, costs off) +select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (array_agg(c5 ORDER BY c1 DESC)) + Relations: Aggregate on (public.ft2) + Remote SQL: SELECT array_agg(c5 ORDER BY "C 1" DESC NULLS FIRST) FROM "S 1"."T 1" WHERE (("C 1" < 50)) AND ((c2 = 6)) +(4 rows) + +select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50; + array_agg +------------------------------------------------------------------------------------------------------------------------------------------ + {"Mon Feb 16 00:00:00 1970","Fri Feb 06 00:00:00 1970","Tue Jan 27 00:00:00 1970","Sat Jan 17 00:00:00 1970","Wed Jan 07 00:00:00 1970"} +(1 row) + +-- DISTINCT within aggregate +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: (array_agg(DISTINCT (t1.c1 % 5))), ((t2.c1 % 3)) + Sort Key: (array_agg(DISTINCT (t1.c1 % 5))) + -> Foreign Scan + Output: (array_agg(DISTINCT (t1.c1 % 5))), ((t2.c1 % 3)) + Relations: Aggregate on ((public.ft4 t1) FULL JOIN (public.ft5 t2)) + Remote SQL: SELECT array_agg(DISTINCT (r1.c1 % 5)), (r2.c1 % 3) FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 < 20) OR ((r1.c1 IS NULL) AND (r2.c1 < 5)))) GROUP BY ((r2.c1 % 3)) +(7 rows) + +select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + array_agg +-------------- + {0,1,2,3,4} + {1,2,3,NULL} +(2 rows) + +-- DISTINCT combined with ORDER BY within aggregate +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Sort + Output: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5))), ((t2.c1 % 3)) + Sort Key: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5))) + -> Foreign Scan + Output: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5))), ((t2.c1 % 3)) + Relations: Aggregate on ((public.ft4 t1) FULL JOIN (public.ft5 t2)) + Remote SQL: SELECT array_agg(DISTINCT (r1.c1 % 5) ORDER BY ((r1.c1 % 5)) ASC NULLS LAST), (r2.c1 % 3) FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 < 20) OR ((r1.c1 IS NULL) AND (r2.c1 < 5)))) GROUP BY ((r2.c1 % 3)) +(7 rows) + +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + array_agg +-------------- + {0,1,2,3,4} + {1,2,3,NULL} +(2 rows) + +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5) DESC NULLS LAST)), ((t2.c1 % 3)) + Sort Key: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5) DESC NULLS LAST)) + -> Foreign Scan + Output: (array_agg(DISTINCT (t1.c1 % 5) ORDER BY (t1.c1 % 5) DESC NULLS LAST)), ((t2.c1 % 3)) + Relations: Aggregate on ((public.ft4 t1) FULL JOIN (public.ft5 t2)) + Remote SQL: SELECT array_agg(DISTINCT (r1.c1 % 5) ORDER BY ((r1.c1 % 5)) DESC NULLS LAST), (r2.c1 % 3) FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 < 20) OR ((r1.c1 IS NULL) AND (r2.c1 < 5)))) GROUP BY ((r2.c1 % 3)) +(7 rows) + +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + array_agg +-------------- + {3,2,1,NULL} + {4,3,2,1,0} +(2 rows) + +-- FILTER within aggregate +explain (verbose, costs off) +select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------- + Sort + Output: (sum(c1) FILTER (WHERE ((c1 < 100) AND (c2 > 5)))), c2 + Sort Key: (sum(ft1.c1) FILTER (WHERE ((ft1.c1 < 100) AND (ft1.c2 > 5)))) + -> Foreign Scan + Output: (sum(c1) FILTER (WHERE ((c1 < 100) AND (c2 > 5)))), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT sum("C 1") FILTER (WHERE (("C 1" < 100) AND (c2 > 5))), c2 FROM "S 1"."T 1" GROUP BY c2 +(7 rows) + +select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last; + sum +----- + 510 + 520 + 530 + 540 + + + + + + +(10 rows) + +-- DISTINCT, ORDER BY and FILTER within aggregate +explain (verbose, costs off) +select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (sum((c1 % 3))), (sum(DISTINCT (c1 % 3) ORDER BY (c1 % 3)) FILTER (WHERE ((c1 % 3) < 2))), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT sum(("C 1" % 3)), sum(DISTINCT ("C 1" % 3) ORDER BY (("C 1" % 3)) ASC NULLS LAST) FILTER (WHERE (("C 1" % 3) < 2)), c2 FROM "S 1"."T 1" WHERE ((c2 = 6)) GROUP BY c2 +(4 rows) + +select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2; + sum | sum | c2 +-----+-----+---- + 99 | 1 | 6 +(1 row) + +-- Outer query is aggregation query +explain (verbose, costs off) +select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------- + Unique + Output: ((SubPlan 1)) + -> Sort + Output: ((SubPlan 1)) + Sort Key: ((SubPlan 1)) + -> Foreign Scan + Output: (SubPlan 1) + Relations: Aggregate on (public.ft2 t2) + Remote SQL: SELECT count(*) FILTER (WHERE ((c2 = 6) AND ("C 1" < 10))) FROM "S 1"."T 1" + SubPlan 1 + -> Foreign Scan on public.ft1 t1 + Output: (count(*) FILTER (WHERE ((t2.c2 = 6) AND (t2.c1 < 10)))) + Remote SQL: SELECT NULL FROM "S 1"."T 1" WHERE (("C 1" = 6)) +(13 rows) + +select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; + count +------- + 1 +(1 row) + +-- Inner query is aggregation query +explain (verbose, costs off) +select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------ + Unique + Output: ((SubPlan 1)) + -> Sort + Output: ((SubPlan 1)) + Sort Key: ((SubPlan 1)) + -> Foreign Scan on public.ft2 t2 + Output: (SubPlan 1) + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" + SubPlan 1 + -> Foreign Scan + Output: (count(t1.c1) FILTER (WHERE ((t2.c2 = 6) AND (t2.c1 < 10)))) + Relations: Aggregate on (public.ft1 t1) + Remote SQL: SELECT count("C 1") FILTER (WHERE (($1::integer = 6) AND ($2::integer < 10))) FROM "S 1"."T 1" WHERE (("C 1" = 6)) +(13 rows) + +select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; + count +------- + 0 + 1 +(2 rows) + +-- Aggregate not pushed down as FILTER condition is not pushable +explain (verbose, costs off) +select sum(c1) filter (where (c1 / c1) * random() <= 1) from ft1 group by c2 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Sort + Output: (sum(c1) FILTER (WHERE ((((c1 / c1))::double precision * random()) <= '1'::double precision))), c2 + Sort Key: (sum(ft1.c1) FILTER (WHERE ((((ft1.c1 / ft1.c1))::double precision * random()) <= '1'::double precision))) + -> HashAggregate + Output: sum(c1) FILTER (WHERE ((((c1 / c1))::double precision * random()) <= '1'::double precision)), c2 + Group Key: ft1.c2 + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" +(9 rows) + +explain (verbose, costs off) +select sum(c2) filter (where c2 in (select c2 from ft1 where c2 < 5)) from ft1; + QUERY PLAN +------------------------------------------------------------------- + Aggregate + Output: sum(ft1.c2) FILTER (WHERE (hashed SubPlan 1)) + -> Foreign Scan on public.ft1 + Output: ft1.c2 + Remote SQL: SELECT c2 FROM "S 1"."T 1" + SubPlan 1 + -> Foreign Scan on public.ft1 ft1_1 + Output: ft1_1.c2 + Remote SQL: SELECT c2 FROM "S 1"."T 1" WHERE ((c2 < 5)) +(9 rows) + +-- Ordered-sets within aggregate +explain (verbose, costs off) +select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: c2, (rank('10'::character varying) WITHIN GROUP (ORDER BY c6)), (percentile_cont((((c2)::numeric / '10'::numeric))::double precision) WITHIN GROUP (ORDER BY ((c1)::double precision))) + Sort Key: ft1.c2 + -> Foreign Scan + Output: c2, (rank('10'::character varying) WITHIN GROUP (ORDER BY c6)), (percentile_cont((((c2)::numeric / '10'::numeric))::double precision) WITHIN GROUP (ORDER BY ((c1)::double precision))) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, rank('10'::character varying) WITHIN GROUP (ORDER BY c6 ASC NULLS LAST), percentile_cont((c2 / 10::numeric)) WITHIN GROUP (ORDER BY ("C 1") ASC NULLS LAST) FROM "S 1"."T 1" WHERE ((c2 < 10)) GROUP BY c2 HAVING ((percentile_cont((c2 / 10::numeric)) WITHIN GROUP (ORDER BY ("C 1") ASC NULLS LAST) < 500::double precision)) +(7 rows) + +select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2; + c2 | rank | percentile_cont +----+------+----------------- + 0 | 101 | 10 + 1 | 101 | 100 + 2 | 1 | 200 + 3 | 1 | 300 + 4 | 1 | 400 +(5 rows) + +-- Using multiple arguments within aggregates +explain (verbose, costs off) +select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: c1, (rank(c1, c2) WITHIN GROUP (ORDER BY c1, c2)), c2 + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT "C 1", rank("C 1", c2) WITHIN GROUP (ORDER BY "C 1" ASC NULLS LAST, c2 ASC NULLS LAST), c2 FROM "S 1"."T 1" WHERE (("C 1" = 6)) GROUP BY "C 1", c2 +(4 rows) + +select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1; + c1 | rank +----+------ + 6 | 1 +(1 row) + +-- User defined function for user defined aggregate, VARIADIC +create function least_accum(anyelement, variadic anyarray) +returns anyelement language sql as + 'select least($1, min($2[i])) from generate_subscripts($2,1) g(i)'; +create aggregate least_agg(variadic items anyarray) ( + stype = anyelement, sfunc = least_accum +); +-- Not pushed down due to user defined aggregate +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 group by c2 order by c2; + QUERY PLAN +------------------------------------------------------------- + Sort + Output: c2, (least_agg(VARIADIC ARRAY[c1])) + Sort Key: ft1.c2 + -> HashAggregate + Output: c2, least_agg(VARIADIC ARRAY[c1]) + Group Key: ft1.c2 + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" +(9 rows) + +-- Add function and aggregate into extension +alter extension postgres_fdw add function least_accum(anyelement, variadic anyarray); +alter extension postgres_fdw add aggregate least_agg(variadic items anyarray); +alter server loopback options (set extensions 'postgres_fdw'); +-- Now aggregate will be pushed. Aggregate will display VARIADIC argument. +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Sort + Output: c2, (least_agg(VARIADIC ARRAY[c1])) + Sort Key: ft1.c2 + -> Foreign Scan + Output: c2, (least_agg(VARIADIC ARRAY[c1])) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, public.least_agg(VARIADIC ARRAY["C 1"]) FROM "S 1"."T 1" WHERE ((c2 < 100)) GROUP BY c2 +(7 rows) + +select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2; + c2 | least_agg +----+----------- + 0 | 10 + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 +(10 rows) + +-- Remove function and aggregate from extension +alter extension postgres_fdw drop function least_accum(anyelement, variadic anyarray); +alter extension postgres_fdw drop aggregate least_agg(variadic items anyarray); +alter server loopback options (set extensions 'postgres_fdw'); +-- Not pushed down as we have dropped objects from extension. +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 group by c2 order by c2; + QUERY PLAN +------------------------------------------------------------- + Sort + Output: c2, (least_agg(VARIADIC ARRAY[c1])) + Sort Key: ft1.c2 + -> HashAggregate + Output: c2, least_agg(VARIADIC ARRAY[c1]) + Group Key: ft1.c2 + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" +(9 rows) + +-- Cleanup +drop aggregate least_agg(variadic items anyarray); +drop function least_accum(anyelement, variadic anyarray); +-- Testing USING OPERATOR() in ORDER BY within aggregate. +-- For this, we need user defined operators along with operator family and +-- operator class. Create those and then add them in extension. Note that +-- user defined objects are considered unshippable unless they are part of +-- the extension. +create operator public.<^ ( + leftarg = int4, + rightarg = int4, + procedure = int4eq +); +create operator public.=^ ( + leftarg = int4, + rightarg = int4, + procedure = int4lt +); +create operator public.>^ ( + leftarg = int4, + rightarg = int4, + procedure = int4gt +); +create operator family my_op_family using btree; +create function my_op_cmp(a int, b int) returns int as + $$begin return btint4cmp(a, b); end $$ language plpgsql; +create operator class my_op_class for type int using btree family my_op_family as + operator 1 public.<^, + operator 3 public.=^, + operator 5 public.>^, + function 1 my_op_cmp(int, int); +-- This will not be pushed as user defined sort operator is not part of the +-- extension yet. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + QUERY PLAN +-------------------------------------------------------------------------------------------- + GroupAggregate + Output: array_agg(c1 ORDER BY c1 USING <^ NULLS LAST), c2 + Group Key: ft2.c2 + -> Foreign Scan on public.ft2 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE (("C 1" < 100)) AND ((c2 = 6)) +(6 rows) + +-- Add into extension +alter extension postgres_fdw add operator class my_op_class using btree; +alter extension postgres_fdw add function my_op_cmp(a int, b int); +alter extension postgres_fdw add operator family my_op_family using btree; +alter extension postgres_fdw add operator public.<^(int, int); +alter extension postgres_fdw add operator public.=^(int, int); +alter extension postgres_fdw add operator public.>^(int, int); +alter server loopback options (set extensions 'postgres_fdw'); +-- Now this will be pushed as sort operator is part of the extension. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (array_agg(c1 ORDER BY c1 USING <^ NULLS LAST)), c2 + Relations: Aggregate on (public.ft2) + Remote SQL: SELECT array_agg("C 1" ORDER BY "C 1" USING OPERATOR(public.<^) NULLS LAST), c2 FROM "S 1"."T 1" WHERE (("C 1" < 100)) AND ((c2 = 6)) GROUP BY c2 +(4 rows) + +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + array_agg +-------------------------------- + {6,16,26,36,46,56,66,76,86,96} +(1 row) + +-- Remove from extension +alter extension postgres_fdw drop operator class my_op_class using btree; +alter extension postgres_fdw drop function my_op_cmp(a int, b int); +alter extension postgres_fdw drop operator family my_op_family using btree; +alter extension postgres_fdw drop operator public.<^(int, int); +alter extension postgres_fdw drop operator public.=^(int, int); +alter extension postgres_fdw drop operator public.>^(int, int); +alter server loopback options (set extensions 'postgres_fdw'); +-- This will not be pushed as sort operator is now removed from the extension. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + QUERY PLAN +-------------------------------------------------------------------------------------------- + GroupAggregate + Output: array_agg(c1 ORDER BY c1 USING <^ NULLS LAST), c2 + Group Key: ft2.c2 + -> Foreign Scan on public.ft2 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE (("C 1" < 100)) AND ((c2 = 6)) +(6 rows) + +-- Cleanup +drop operator class my_op_class using btree; +drop function my_op_cmp(a int, b int); +drop operator family my_op_family using btree; +drop operator public.>^(int, int); +drop operator public.=^(int, int); +drop operator public.<^(int, int); +-- Input relation to aggregate push down hook is not safe to pushdown and thus +-- the aggregate cannot be pushed down to foreign server. +explain (verbose, costs off) +select count(t1.c3) from ft1 t1, ft1 t2 where t1.c1 = postgres_fdw_abs(t1.c2); + QUERY PLAN +---------------------------------------------------------------------------------------------------------- + Aggregate + Output: count(t1.c3) + -> Nested Loop + Output: t1.c3 + -> Foreign Scan on public.ft1 t2 + Remote SQL: SELECT NULL FROM "S 1"."T 1" + -> Materialize + Output: t1.c3 + -> Foreign Scan on public.ft1 t1 + Output: t1.c3 + Remote SQL: SELECT c3 FROM "S 1"."T 1" WHERE (("C 1" = public.postgres_fdw_abs(c2))) +(11 rows) + +-- Subquery in FROM clause having aggregate +explain (verbose, costs off) +select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2; + QUERY PLAN +------------------------------------------------------------------------------------------------ + Sort + Output: (count(*)), x.b + Sort Key: (count(*)), x.b + -> HashAggregate + Output: count(*), x.b + Group Key: x.b + -> Hash Join + Output: x.b + Hash Cond: (ft1.c2 = x.a) + -> Foreign Scan on public.ft1 + Output: ft1.c2 + Remote SQL: SELECT c2 FROM "S 1"."T 1" + -> Hash + Output: x.b, x.a + -> Subquery Scan on x + Output: x.b, x.a + -> Foreign Scan + Output: ft1_1.c2, (sum(ft1_1.c1)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, sum("C 1") FROM "S 1"."T 1" GROUP BY c2 +(20 rows) + +select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2; + count | b +-------+------- + 100 | 49600 + 100 | 49700 + 100 | 49800 + 100 | 49900 + 100 | 50000 + 100 | 50100 + 100 | 50200 + 100 | 50300 + 100 | 50400 + 100 | 50500 +(10 rows) + +-- FULL join with IS NULL check in HAVING +explain (verbose, costs off) +select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: (avg(t1.c1)), (sum(t2.c1)), t2.c1 + Sort Key: (avg(t1.c1)), (sum(t2.c1)) + -> Foreign Scan + Output: (avg(t1.c1)), (sum(t2.c1)), t2.c1 + Relations: Aggregate on ((public.ft4 t1) FULL JOIN (public.ft5 t2)) + Remote SQL: SELECT avg(r1.c1), sum(r2.c1), r2.c1 FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) GROUP BY r2.c1 HAVING ((((avg(r1.c1) IS NULL) AND (sum(r2.c1) < 10)) OR (sum(r2.c1) IS NULL))) +(7 rows) + +select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2; + avg | sum +---------------------+----- + 51.0000000000000000 | + | 3 + | 9 +(3 rows) + +-- ORDER BY expression is part of the target list but not pushed down to +-- foreign server. +explain (verbose, costs off) +select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1; + QUERY PLAN +-------------------------------------------------------------------------------- + Sort + Output: (((sum(c2)) * ((random() <= '1'::double precision))::integer)) + Sort Key: (((sum(ft1.c2)) * ((random() <= '1'::double precision))::integer)) + -> Foreign Scan + Output: ((sum(c2)) * ((random() <= '1'::double precision))::integer) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT sum(c2) FROM "S 1"."T 1" +(7 rows) + +select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1; + sum +------ + 4500 +(1 row) + +-- LATERAL join, with parameterization +set enable_hashagg to false; +explain (verbose, costs off) +select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------- + Sort + Output: t1.c2, qry.sum + Sort Key: t1.c2 + -> Nested Loop + Output: t1.c2, qry.sum + -> Seq Scan on "S 1"."T 1" t1 + Output: t1."C 1", t1.c2, t1.c3, t1.c4, t1.c5, t1.c6, t1.c7, t1.c8 + Filter: (t1.c2 < 10) + -> Subquery Scan on qry + Output: qry.sum, t2.c1 + Filter: ((t1.c2 * 2) = qry.sum) + -> Foreign Scan + Output: (sum((t2.c1 + t1."C 1"))), t2.c1 + Relations: Aggregate on (public.ft2 t2) + Remote SQL: SELECT sum(("C 1" + $1::integer)), "C 1" FROM "S 1"."T 1" GROUP BY "C 1" +(15 rows) + +select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1; + c2 | sum +----+----- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 + 6 | 12 + 7 | 14 + 8 | 16 + 9 | 18 +(9 rows) + +reset enable_hashagg; +-- Check with placeHolderVars +explain (verbose, costs off) +select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: q.b, (count(ft4.c1)), (sum(q.a)) + Sort Key: q.b, (count(ft4.c1)) + -> GroupAggregate + Output: q.b, count(ft4.c1), sum(q.a) + Group Key: q.b + -> Sort + Output: q.b, ft4.c1, q.a + Sort Key: q.b + -> Hash Left Join + Output: q.b, ft4.c1, q.a + Hash Cond: ((ft4.c1)::numeric = q.b) + -> Foreign Scan on public.ft4 + Output: ft4.c1, ft4.c2, ft4.c3 + Remote SQL: SELECT c1 FROM "S 1"."T 3" WHERE ((c1 >= 10)) AND ((c1 <= 15)) + -> Hash + Output: q.b, q.a + -> Subquery Scan on q + Output: q.b, q.a + -> Foreign Scan + Output: (min(13)), (avg(ft1.c1)), (NULL::bigint) + Relations: Aggregate on ((public.ft1) INNER JOIN (public.ft2)) + Remote SQL: SELECT min(13), avg(r1."C 1"), NULL::bigint FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r2."C 1" = 12)) AND ((r1."C 1" = 12)))) +(23 rows) + +select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2; + b | count | sum +---------------------+-------+----- + 12.0000000000000000 | 1 | 13 + | 2 | +(2 rows) + +-- Not supported cases +-- Grouping sets +explain (verbose, costs off) +select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + GroupAggregate + Output: c2, sum(c1) + Group Key: ft1.c2 + Group Key: () + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE ((c2 < 3)) ORDER BY c2 ASC NULLS LAST +(7 rows) + +select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last; + c2 | sum +----+-------- + 0 | 50500 + 1 | 49600 + 2 | 49700 + | 149800 +(4 rows) + +explain (verbose, costs off) +select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + GroupAggregate + Output: c2, sum(c1) + Group Key: ft1.c2 + Group Key: () + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE ((c2 < 3)) ORDER BY c2 ASC NULLS LAST +(7 rows) + +select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last; + c2 | sum +----+-------- + 0 | 50500 + 1 | 49600 + 2 | 49700 + | 149800 +(4 rows) + +explain (verbose, costs off) +select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last; + QUERY PLAN +------------------------------------------------------------------------------------------------------------- + Sort + Output: c2, c6, (sum(c1)) + Sort Key: ft1.c2, ft1.c6 + -> GroupAggregate + Output: c2, c6, sum(c1) + Group Key: ft1.c2 + Sort Key: ft1.c6 + Group Key: ft1.c6 + -> Foreign Scan on public.ft1 + Output: c2, c6, c1 + Remote SQL: SELECT "C 1", c2, c6 FROM "S 1"."T 1" WHERE ((c2 < 3)) ORDER BY c2 ASC NULLS LAST +(11 rows) + +select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last; + c2 | c6 | sum +----+----+------- + 0 | | 50500 + 1 | | 49600 + 2 | | 49700 + | 0 | 50500 + | 1 | 49600 + | 2 | 49700 +(6 rows) + +explain (verbose, costs off) +select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last; + QUERY PLAN +------------------------------------------------------------------------------ + Sort + Output: c2, (sum(c1)), (GROUPING(c2)) + Sort Key: ft1.c2 + -> HashAggregate + Output: c2, sum(c1), GROUPING(c2) + Group Key: ft1.c2 + -> Foreign Scan on public.ft1 + Output: c2, c1 + Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE ((c2 < 3)) +(9 rows) + +select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last; + c2 | sum | grouping +----+-------+---------- + 0 | 50500 | 0 + 1 | 49600 | 0 + 2 | 49700 | 0 +(3 rows) + +-- DISTINCT itself is not pushed down, whereas underneath aggregate is pushed +explain (verbose, costs off) +select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1; + QUERY PLAN +-------------------------------------------------------------------------------------------------------- + Unique + Output: ((sum(c1) / 1000)), c2 + -> Sort + Output: ((sum(c1) / 1000)), c2 + Sort Key: ((sum(ft2.c1) / 1000)) + -> Foreign Scan + Output: ((sum(c1) / 1000)), c2 + Relations: Aggregate on (public.ft2) + Remote SQL: SELECT (sum("C 1") / 1000), c2 FROM "S 1"."T 1" WHERE ((c2 < 6)) GROUP BY c2 +(9 rows) + +select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1; + s +---- + 49 + 50 +(2 rows) + +-- WindowAgg +explain (verbose, costs off) +select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------- + Sort + Output: c2, (sum(c2)), (count(c2) OVER (?)), ((c2 % 2)) + Sort Key: ft2.c2 + -> WindowAgg + Output: c2, (sum(c2)), count(c2) OVER (?), ((c2 % 2)) + -> Sort + Output: c2, ((c2 % 2)), (sum(c2)) + Sort Key: ((ft2.c2 % 2)) + -> Foreign Scan + Output: c2, ((c2 % 2)), (sum(c2)) + Relations: Aggregate on (public.ft2) + Remote SQL: SELECT c2, (c2 % 2), sum(c2) FROM "S 1"."T 1" WHERE ((c2 < 10)) GROUP BY c2 +(12 rows) + +select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1; + c2 | sum | count +----+-----+------- + 0 | 0 | 5 + 1 | 100 | 5 + 2 | 200 | 5 + 3 | 300 | 5 + 4 | 400 | 5 + 5 | 500 | 5 + 6 | 600 | 5 + 7 | 700 | 5 + 8 | 800 | 5 + 9 | 900 | 5 +(10 rows) + +explain (verbose, costs off) +select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Sort + Output: c2, (array_agg(c2) OVER (?)), ((c2 % 2)) + Sort Key: ft1.c2 + -> WindowAgg + Output: c2, array_agg(c2) OVER (?), ((c2 % 2)) + -> Sort + Output: c2, ((c2 % 2)) + Sort Key: ((ft1.c2 % 2)), ft1.c2 DESC + -> Foreign Scan + Output: c2, ((c2 % 2)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, (c2 % 2) FROM "S 1"."T 1" WHERE ((c2 < 10)) GROUP BY c2 +(12 rows) + +select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1; + c2 | array_agg +----+------------- + 0 | {8,6,4,2,0} + 1 | {9,7,5,3,1} + 2 | {8,6,4,2} + 3 | {9,7,5,3} + 4 | {8,6,4} + 5 | {9,7,5} + 6 | {8,6} + 7 | {9,7} + 8 | {8} + 9 | {9} +(10 rows) + +explain (verbose, costs off) +select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Sort + Output: c2, (array_agg(c2) OVER (?)), ((c2 % 2)) + Sort Key: ft1.c2 + -> WindowAgg + Output: c2, array_agg(c2) OVER (?), ((c2 % 2)) + -> Sort + Output: c2, ((c2 % 2)) + Sort Key: ((ft1.c2 % 2)), ft1.c2 + -> Foreign Scan + Output: c2, ((c2 % 2)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT c2, (c2 % 2) FROM "S 1"."T 1" WHERE ((c2 < 10)) GROUP BY c2 +(12 rows) + +select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1; + c2 | array_agg +----+------------- + 0 | {0,2,4,6,8} + 1 | {1,3,5,7,9} + 2 | {2,4,6,8} + 3 | {3,5,7,9} + 4 | {4,6,8} + 5 | {5,7,9} + 6 | {6,8} + 7 | {7,9} + 8 | {8} + 9 | {9} +(10 rows) + -- =================================================================== -- parameterized queries -- =================================================================== @@ -2608,6 +3736,9 @@ CONTEXT: column "c8" of foreign table "ft1" SELECT ft1.c1, ft2.c2, ft1 FROM ft1, ft2 WHERE ft1.c1 = ft2.c1 AND ft1.c1 = 1; -- ERROR ERROR: invalid input syntax for integer: "foo" CONTEXT: whole-row reference to foreign table "ft1" +SELECT sum(c2), array_agg(c8) FROM ft1 GROUP BY c8; -- ERROR +ERROR: invalid input syntax for integer: "foo" +CONTEXT: processing expression at position 2 in select list ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE user_enum; -- =================================================================== -- subtransaction @@ -4441,12 +5572,12 @@ SELECT * FROM ft1 ORDER BY c6 ASC NULLS FIRST, c1 OFFSET 15 LIMIT 10; -- Consistent check constraints provide consistent results ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2positive CHECK (c2 >= 0); EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 WHERE c2 < 0; - QUERY PLAN -------------------------------------------------------------------- - Aggregate - Output: count(*) - -> Foreign Scan on public.ft1 - Remote SQL: SELECT NULL FROM "S 1"."T 1" WHERE ((c2 < 0)) + QUERY PLAN +----------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT count(*) FROM "S 1"."T 1" WHERE ((c2 < 0)) (4 rows) SELECT count(*) FROM ft1 WHERE c2 < 0; @@ -4485,12 +5616,12 @@ ALTER FOREIGN TABLE ft1 DROP CONSTRAINT ft1_c2positive; -- But inconsistent check constraints provide inconsistent results ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2negative CHECK (c2 < 0); EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 WHERE c2 >= 0; - QUERY PLAN --------------------------------------------------------------------- - Aggregate - Output: count(*) - -> Foreign Scan on public.ft1 - Remote SQL: SELECT NULL FROM "S 1"."T 1" WHERE ((c2 >= 0)) + QUERY PLAN +------------------------------------------------------------------ + Foreign Scan + Output: (count(*)) + Relations: Aggregate on (public.ft1) + Remote SQL: SELECT count(*) FROM "S 1"."T 1" WHERE ((c2 >= 0)) (4 rows) SELECT count(*) FROM ft1 WHERE c2 >= 0; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index daf0438532..906d6e6abd 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -25,6 +25,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" +#include "optimizer/clauses.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" @@ -38,6 +39,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/sampling.h" +#include "utils/selfuncs.h" PG_MODULE_MAGIC; @@ -343,6 +345,10 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root, JoinPathExtraData *extra); static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot); +static void postgresGetForeignUpperPaths(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel); /* * Helper functions @@ -400,11 +406,15 @@ static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra); +static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path); +static void add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel); /* @@ -455,6 +465,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + /* Support functions for upper relation push-down */ + routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + PG_RETURN_POINTER(routine); } @@ -1120,7 +1133,7 @@ postgresGetForeignPlan(PlannerInfo *root, * rel->baserestrictinfo + parameterization clauses through * scan_clauses. For a join rel->baserestrictinfo is NIL and we are * not considering parameterization right now, so there should be no - * scan_clauses for a joinrel. + * scan_clauses for a joinrel and upper rel either. */ Assert(!scan_clauses); } @@ -1170,7 +1183,8 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs = lappend(local_exprs, rinfo->clause); } - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) { /* For a join relation, get the conditions from fdw_private structure */ remote_conds = fpinfo->remote_conds; @@ -1191,6 +1205,13 @@ postgresGetForeignPlan(PlannerInfo *root, { ListCell *lc; + /* + * Right now, we only consider grouping and aggregation beyond + * joins. Queries involving aggregates or grouping do not require + * EPQ mechanism, hence should not have an outer plan here. + */ + Assert(foreignrel->reloptkind != RELOPT_UPPER_REL); + outer_plan->targetlist = fdw_scan_tlist; foreach(lc, local_exprs) @@ -1228,7 +1249,8 @@ postgresGetForeignPlan(PlannerInfo *root, remote_conds, retrieved_attrs, makeInteger(fpinfo->fetch_size)); - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data)); @@ -1280,8 +1302,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered - * member RTE as a representative; we would get the same result from any. + * ExecCheckRTEPerms() does. In case of a join or aggregate, use the + * lowest-numbered member RTE as a representative; we would get the same + * result from any. */ if (fsplan->scan.scanrelid > 0) rtindex = fsplan->scan.scanrelid; @@ -2452,7 +2475,8 @@ postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) /* * estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation - * either a base relation or a join between foreign relations. + * either a base relation or a join between foreign relations or an upper + * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. @@ -2505,7 +2529,8 @@ estimate_path_cost_size(PlannerInfo *root, &remote_param_join_conds, &local_param_join_conds); /* Build the list of columns to be fetched from the foreign server. */ - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_scan_tlist = build_tlist_to_deparse(foreignrel); else fdw_scan_tlist = NIL; @@ -2586,25 +2611,7 @@ estimate_path_cost_size(PlannerInfo *root, startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; } - else if (foreignrel->reloptkind != RELOPT_JOINREL) - { - /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ - retrieved_rows = Min(retrieved_rows, foreignrel->tuples); - - /* - * Cost as though this were a seqscan, which is pessimistic. We - * effectively imagine the local_conds are being evaluated - * remotely, too. - */ - startup_cost = 0; - run_cost = 0; - run_cost += seq_page_cost * foreignrel->pages; - - startup_cost += foreignrel->baserestrictcost.startup; - cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * foreignrel->tuples; - } - else + else if (foreignrel->reloptkind == RELOPT_JOINREL) { PgFdwRelationInfo *fpinfo_i; PgFdwRelationInfo *fpinfo_o; @@ -2670,6 +2677,99 @@ estimate_path_cost_size(PlannerInfo *root, run_cost += nrows * remote_conds_cost.per_tuple; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } + else if (foreignrel->reloptkind == RELOPT_UPPER_REL) + { + PgFdwRelationInfo *ofpinfo; + PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG]; + AggClauseCosts aggcosts; + double input_rows; + int numGroupCols; + double numGroups = 1; + + /* + * This cost model is mixture of costing done for sorted and + * hashed aggregates in cost_agg(). We are not sure which + * strategy will be considered at remote side, thus for + * simplicity, we put all startup related costs in startup_cost + * and all finalization and run cost are added in total_cost. + * + * Also, core does not care about costing HAVING expressions and + * adding that to the costs. So similarly, here too we are not + * considering remote and local conditions for costing. + */ + + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Get rows and width from input rel */ + input_rows = ofpinfo->rows; + width = ofpinfo->width; + + /* Collect statistics about aggregates for estimating costs. */ + MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); + if (root->parse->hasAggs) + { + get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist, + AGGSPLIT_SIMPLE, &aggcosts); + get_agg_clause_costs(root, (Node *) root->parse->havingQual, + AGGSPLIT_SIMPLE, &aggcosts); + } + + /* Get number of grouping columns and possible number of groups */ + numGroupCols = list_length(root->parse->groupClause); + numGroups = estimate_num_groups(root, + get_sortgrouplist_exprs(root->parse->groupClause, + fpinfo->grouped_tlist), + input_rows, NULL); + + /* + * Number of rows expected from foreign server will be same as + * that of number of groups. + */ + rows = retrieved_rows = numGroups; + + /*----- + * Startup cost includes: + * 1. Startup cost for underneath input * relation + * 2. Cost of performing aggregation, per cost_agg() + * 3. Startup cost for PathTarget eval + *----- + */ + startup_cost = ofpinfo->rel_startup_cost; + startup_cost += aggcosts.transCost.startup; + startup_cost += aggcosts.transCost.per_tuple * input_rows; + startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; + startup_cost += ptarget->cost.startup; + + /*----- + * Run time cost includes: + * 1. Run time cost of underneath input relation + * 2. Run time cost of performing aggregation, per cost_agg() + * 3. PathTarget eval cost for each output row + *----- + */ + run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; + run_cost += aggcosts.finalCost * numGroups; + run_cost += cpu_tuple_cost * numGroups; + run_cost += ptarget->cost.per_tuple * numGroups; + } + else + { + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; + + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + } /* * Without remote estimates, we have no real way to estimate the cost @@ -4342,6 +4442,318 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +/* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to PgFdwRelationInfo of the input relation. + */ +static bool +foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel) +{ + Query *query = root->parse; + PathTarget *grouping_target; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; + PgFdwRelationInfo *ofpinfo; + List *aggvars; + ListCell *lc; + int i; + List *tlist = NIL; + + /* Grouping Sets are not pushable */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* + * If underneath input relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * The targetlist expected from this node and the targetlist pushed down + * to the foreign server may be different. The latter requires + * sortgrouprefs to be set to push down GROUP BY clause, but should not + * have those arising from ORDER BY clause. These sortgrouprefs may be + * different from those in the plan's targetlist. Use a copy of path + * target to record the new sortgrouprefs. + */ + grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]); + + /* + * Evaluate grouping targets and check whether they are safe to push down + * to the foreign side. All GROUP BY expressions will be part of the + * grouping target and thus there is no need to evaluate it separately. + * While doing so, add required expressions into target list which can + * then be used to pass to foreign server. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + /* + * If any of the GROUP BY expression is not shippable we can not + * push down aggregation to the foreign server. + */ + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Check entire expression whether it is pushable or not */ + if (is_foreign_expr(root, grouped_rel, expr)) + { + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* + * If we have sortgroupref set, then it means that we have an + * ORDER BY entry pointing to this expression. Since we are + * not pushing ORDER BY with GROUP BY, clear it. + */ + if (sgref) + grouping_target->sortgrouprefs[i] = 0; + + /* Not matched exactly, pull the var with aggregates then */ + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain var + * nodes should be either same as some GROUP BY expression or + * part of some GROUP BY expression. In later case, the query + * cannot refer plain var nodes without the surrounding + * expression. In both the cases, they are already part of + * the targetlist and thus no need to add them again. In fact + * adding pulled plain var nodes in SELECT clause will cause + * an error on the foreign server if they are not same as some + * GROUP BY expression. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * Classify the pushable and non-pushable having clauses and save them in + * remote_conds and local_conds of the grouped rel's fpinfo. + */ + if (root->hasHavingQual && query->havingQual) + { + ListCell *lc; + + foreach(lc, (List *) query->havingQual) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, grouped_rel, expr)) + fpinfo->local_conds = lappend(fpinfo->local_conds, expr); + else + fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr); + } + } + + /* + * If there are any local conditions, pull Vars and aggregates from it and + * check whether they are safe to pushdown or not. + */ + if (fpinfo->local_conds) + { + ListCell *lc; + List *aggvars = pull_var_clause((Node *) fpinfo->local_conds, + PVC_INCLUDE_AGGREGATES); + + foreach(lc, aggvars) + { + Expr *expr = (Expr *) lfirst(lc); + + /* + * If aggregates within local conditions are not safe to push + * down, then we cannot push down the query. Vars are already + * part of GROUP BY clause which are checked above, so no need to + * access them again here. + */ + if (IsA(expr, Aggref)) + { + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + tlist = add_to_flat_tlist(tlist, aggvars); + } + } + } + + /* Transfer any sortgroupref data to the replacement tlist */ + apply_pathtarget_labeling_to_tlist(tlist, grouping_target); + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * If user is willing to estimate cost for a scan using EXPLAIN, he + * intends to estimate scans on that relation more accurately. Then, it + * makes sense to estimate the cost of the grouping on that relation more + * accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate; + + /* Copy startup and tuple cost as is from underneath input rel's fpinfo */ + fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost; + fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost; + + /* + * Set cached relation costs to some negative value, so that we can detect + * when they are set to some sensible costs, during one (usually the + * first) of the calls to estimate_path_cost_size(). + */ + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* Set fetch size same as that of underneath input rel's fpinfo */ + fpinfo->fetch_size = ofpinfo->fetch_size; + + /* + * Set the string describing this grouped relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)", + ofpinfo->relation_name->data); + + return true; +} + +/* + * postgresGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + * + * Right now, we only support aggregate, grouping and having clause pushdown. + */ +static void +postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel) +{ + PgFdwRelationInfo *fpinfo; + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) + return; + + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + output_rel->fdw_private = fpinfo; + + add_foreign_grouping_paths(root, input_rel, output_rel); +} + +/* + * add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + PathTarget *grouping_target; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + grouping_target = root->upper_targets[UPPERREL_GROUP_AGG]; + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, shippable extensions + * etc. details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + fpinfo->shippable_extensions = ifpinfo->shippable_extensions; + + /* Assess if it is safe to push down aggregation and grouping. */ + if (!foreign_grouping_ok(root, grouped_rel)) + return; + + /* Estimate the cost of push down */ + estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows, + &width, &startup_cost, &total_cost); + + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreignscan_path(root, + grouped_rel, + grouping_target, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + NULL, + NULL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} + /* * Create a tuple from the specified row of the PGresult. * @@ -4549,24 +4961,34 @@ conversion_error_callback(void *arg) ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; EState *estate = fsstate->ss.ps.state; TargetEntry *tle; - Var *var; - RangeTblEntry *rte; Assert(IsA(fsplan, ForeignScan)); tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, errpos->cur_attno - 1); Assert(IsA(tle, TargetEntry)); - var = (Var *) tle->expr; - Assert(IsA(var, Var)); - rte = rt_fetch(var->varno, estate->es_range_table); + /* + * Target list can have Vars and expressions. For Vars, we can get + * it's relation, however for expressions we can't. Thus for + * expressions, just show generic context message. + */ + if (IsA(tle->expr, Var)) + { + RangeTblEntry *rte; + Var *var = (Var *) tle->expr; - if (var->varattno == 0) - is_wholerow = true; + rte = rt_fetch(var->varno, estate->es_range_table); + + if (var->varattno == 0) + is_wholerow = true; + else + attname = get_relid_attribute_name(rte->relid, var->varattno); + + relname = get_rel_name(rte->relid); + } else - attname = get_relid_attribute_name(rte->relid, var->varattno); - - relname = get_rel_name(rte->relid); + errcontext("processing expression at position %d in select list", + errpos->cur_attno); } if (relname) diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 67126bc421..f8c255ef9b 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -92,6 +92,9 @@ typedef struct PgFdwRelationInfo RelOptInfo *innerrel; JoinType jointype; List *joinclauses; + + /* Grouping information */ + List *grouped_tlist; } PgFdwRelationInfo; /* in postgres_fdw.c */ @@ -155,7 +158,7 @@ extern void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs); extern void deparseStringLiteral(StringInfo buf, const char *val); extern Expr *find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel); -extern List *build_tlist_to_deparse(RelOptInfo *foreign_rel); +extern List *build_tlist_to_deparse(RelOptInfo *foreignrel); extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, List *tlist, List *remote_conds, List *pathkeys, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 4f68e8904e..aeed8f62de 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -541,6 +541,310 @@ ALTER VIEW v4 OWNER TO regress_view_owner; DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; + +-- =================================================================== +-- Aggregate and grouping queries +-- =================================================================== + +-- Simple aggregates +explain (verbose, costs off) +select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2; +select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2; + +-- Aggregate is not pushed down as aggregation contains random() +explain (verbose, costs off) +select sum(c1 * (random() <= 1)::int) as sum, avg(c1) from ft1; + +-- Aggregate over join query +explain (verbose, costs off) +select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6; +select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6; + +-- Not pushed down due to local conditions present in underneath input rel +explain (verbose, costs off) +select sum(t1.c1), count(t2.c1) from ft1 t1 inner join ft2 t2 on (t1.c1 = t2.c1) where ((t1.c1 * t2.c1)/(t1.c1 * t2.c1)) * random() <= 1; + +-- GROUP BY clause having expressions +explain (verbose, costs off) +select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2; +select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2; + +-- Aggregates in subquery are pushed down. +explain (verbose, costs off) +select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x; +select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x; + +-- Aggregate is still pushed down by taking unshippable expression out +explain (verbose, costs off) +select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2; +select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2; + +-- Aggregate with unshippable GROUP BY clause are not pushed +explain (verbose, costs off) +select c2 * (random() <= 1)::int as c2 from ft2 group by c2 * (random() <= 1)::int order by 1; + +-- GROUP BY clause in various forms, cardinal, alias and constant expression +explain (verbose, costs off) +select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2; +select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2; + +-- Testing HAVING clause shippability +explain (verbose, costs off) +select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2; +select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2; + +-- Using expressions in HAVING clause +explain (verbose, costs off) +select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2; +select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2; + +-- Unshippable HAVING clause will be evaluated locally, and other qual in HAVING clause is pushed down +explain (verbose, costs off) +select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x; +select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x; + +-- Aggregate in HAVING clause is not pushable, and thus aggregation is not pushed down +explain (verbose, costs off) +select sum(c1) from ft1 group by c2 having avg(c1 * (random() <= 1)::int) > 100 order by 1; + + +-- Testing ORDER BY, DISTINCT, FILTER, Ordered-sets and VARIADIC within aggregates + +-- ORDER BY within aggregate, same column used to order +explain (verbose, costs off) +select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1; +select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1; + +-- ORDER BY within aggregate, different column used to order also using DESC +explain (verbose, costs off) +select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50; +select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50; + +-- DISTINCT within aggregate +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; +select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + +-- DISTINCT combined with ORDER BY within aggregate +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + +explain (verbose, costs off) +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; +select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1; + +-- FILTER within aggregate +explain (verbose, costs off) +select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last; +select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last; + +-- DISTINCT, ORDER BY and FILTER within aggregate +explain (verbose, costs off) +select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2; +select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2; + +-- Outer query is aggregation query +explain (verbose, costs off) +select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; +select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; +-- Inner query is aggregation query +explain (verbose, costs off) +select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; +select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1; + +-- Aggregate not pushed down as FILTER condition is not pushable +explain (verbose, costs off) +select sum(c1) filter (where (c1 / c1) * random() <= 1) from ft1 group by c2 order by 1; +explain (verbose, costs off) +select sum(c2) filter (where c2 in (select c2 from ft1 where c2 < 5)) from ft1; + +-- Ordered-sets within aggregate +explain (verbose, costs off) +select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2; +select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2; + +-- Using multiple arguments within aggregates +explain (verbose, costs off) +select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1; +select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1; + +-- User defined function for user defined aggregate, VARIADIC +create function least_accum(anyelement, variadic anyarray) +returns anyelement language sql as + 'select least($1, min($2[i])) from generate_subscripts($2,1) g(i)'; +create aggregate least_agg(variadic items anyarray) ( + stype = anyelement, sfunc = least_accum +); + +-- Not pushed down due to user defined aggregate +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 group by c2 order by c2; + +-- Add function and aggregate into extension +alter extension postgres_fdw add function least_accum(anyelement, variadic anyarray); +alter extension postgres_fdw add aggregate least_agg(variadic items anyarray); +alter server loopback options (set extensions 'postgres_fdw'); + +-- Now aggregate will be pushed. Aggregate will display VARIADIC argument. +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2; +select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2; + +-- Remove function and aggregate from extension +alter extension postgres_fdw drop function least_accum(anyelement, variadic anyarray); +alter extension postgres_fdw drop aggregate least_agg(variadic items anyarray); +alter server loopback options (set extensions 'postgres_fdw'); + +-- Not pushed down as we have dropped objects from extension. +explain (verbose, costs off) +select c2, least_agg(c1) from ft1 group by c2 order by c2; + +-- Cleanup +drop aggregate least_agg(variadic items anyarray); +drop function least_accum(anyelement, variadic anyarray); + + +-- Testing USING OPERATOR() in ORDER BY within aggregate. +-- For this, we need user defined operators along with operator family and +-- operator class. Create those and then add them in extension. Note that +-- user defined objects are considered unshippable unless they are part of +-- the extension. +create operator public.<^ ( + leftarg = int4, + rightarg = int4, + procedure = int4eq +); + +create operator public.=^ ( + leftarg = int4, + rightarg = int4, + procedure = int4lt +); + +create operator public.>^ ( + leftarg = int4, + rightarg = int4, + procedure = int4gt +); + +create operator family my_op_family using btree; + +create function my_op_cmp(a int, b int) returns int as + $$begin return btint4cmp(a, b); end $$ language plpgsql; + +create operator class my_op_class for type int using btree family my_op_family as + operator 1 public.<^, + operator 3 public.=^, + operator 5 public.>^, + function 1 my_op_cmp(int, int); + +-- This will not be pushed as user defined sort operator is not part of the +-- extension yet. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + +-- Add into extension +alter extension postgres_fdw add operator class my_op_class using btree; +alter extension postgres_fdw add function my_op_cmp(a int, b int); +alter extension postgres_fdw add operator family my_op_family using btree; +alter extension postgres_fdw add operator public.<^(int, int); +alter extension postgres_fdw add operator public.=^(int, int); +alter extension postgres_fdw add operator public.>^(int, int); +alter server loopback options (set extensions 'postgres_fdw'); + +-- Now this will be pushed as sort operator is part of the extension. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + +-- Remove from extension +alter extension postgres_fdw drop operator class my_op_class using btree; +alter extension postgres_fdw drop function my_op_cmp(a int, b int); +alter extension postgres_fdw drop operator family my_op_family using btree; +alter extension postgres_fdw drop operator public.<^(int, int); +alter extension postgres_fdw drop operator public.=^(int, int); +alter extension postgres_fdw drop operator public.>^(int, int); +alter server loopback options (set extensions 'postgres_fdw'); + +-- This will not be pushed as sort operator is now removed from the extension. +explain (verbose, costs off) +select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2; + +-- Cleanup +drop operator class my_op_class using btree; +drop function my_op_cmp(a int, b int); +drop operator family my_op_family using btree; +drop operator public.>^(int, int); +drop operator public.=^(int, int); +drop operator public.<^(int, int); + +-- Input relation to aggregate push down hook is not safe to pushdown and thus +-- the aggregate cannot be pushed down to foreign server. +explain (verbose, costs off) +select count(t1.c3) from ft1 t1, ft1 t2 where t1.c1 = postgres_fdw_abs(t1.c2); + +-- Subquery in FROM clause having aggregate +explain (verbose, costs off) +select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2; +select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2; + +-- FULL join with IS NULL check in HAVING +explain (verbose, costs off) +select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2; +select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2; + +-- ORDER BY expression is part of the target list but not pushed down to +-- foreign server. +explain (verbose, costs off) +select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1; +select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1; + +-- LATERAL join, with parameterization +set enable_hashagg to false; +explain (verbose, costs off) +select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1; +select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1; +reset enable_hashagg; + +-- Check with placeHolderVars +explain (verbose, costs off) +select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2; +select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2; + + +-- Not supported cases +-- Grouping sets +explain (verbose, costs off) +select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last; +select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last; +explain (verbose, costs off) +select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last; +select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last; +explain (verbose, costs off) +select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last; +select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last; +explain (verbose, costs off) +select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last; +select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last; + +-- DISTINCT itself is not pushed down, whereas underneath aggregate is pushed +explain (verbose, costs off) +select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1; +select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1; + +-- WindowAgg +explain (verbose, costs off) +select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1; +select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1; +explain (verbose, costs off) +select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1; +select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1; +explain (verbose, costs off) +select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1; +select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1; + + -- =================================================================== -- parameterized queries -- =================================================================== @@ -624,6 +928,7 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE int; SELECT * FROM ft1 WHERE c1 = 1; -- ERROR SELECT ft1.c1, ft2.c2, ft1.c8 FROM ft1, ft2 WHERE ft1.c1 = ft2.c1 AND ft1.c1 = 1; -- ERROR SELECT ft1.c1, ft2.c2, ft1 FROM ft1, ft2 WHERE ft1.c1 = ft2.c1 AND ft1.c1 = 1; -- ERROR +SELECT sum(c2), array_agg(c8) FROM ft1 GROUP BY c8; -- ERROR ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE user_enum; -- =================================================================== diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 47158f6468..ad49674684 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3243,8 +3243,15 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path, /* Copy foreign server OID; likewise, no need to make FDW do this */ scan_plan->fs_server = rel->serverid; - /* Likewise, copy the relids that are represented by this foreign scan */ - scan_plan->fs_relids = best_path->path.parent->relids; + /* + * Likewise, copy the relids that are represented by this foreign scan. An + * upper rel doesn't have relids set, but it covers all the base relations + * participating in the underlying scan, so use root's all_baserels. + */ + if (rel->reloptkind == RELOPT_UPPER_REL) + scan_plan->fs_relids = root->all_baserels; + else + scan_plan->fs_relids = best_path->path.parent->relids; /* * If this is a foreign join, and to make it valid to push down we had to