postgres_fdw: Push down aggregates to remote servers.

Now that the upper planner uses paths, and now that we have proper hooks
to inject paths into the upper planning process, it's possible for
foreign data wrappers to arrange to push aggregates to the remote side
instead of fetching all of the rows and aggregating them locally.  This
figures to be a massive win for performance, so teach postgres_fdw to
do it.

Jeevan Chalke and Ashutosh Bapat.  Reviewed by Ashutosh Bapat with
additional testing by Prabhat Sahu.  Various mostly cosmetic changes
by me.
This commit is contained in:
Robert Haas 2016-10-21 09:54:29 -04:00
parent 709e461bef
commit 7012b132d0
6 changed files with 2452 additions and 150 deletions

View File

@ -38,6 +38,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h"
@ -56,6 +57,7 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
/*
@ -65,6 +67,8 @@ typedef struct foreign_glob_cxt
{
PlannerInfo *root; /* global planner state */
RelOptInfo *foreignrel; /* the foreign relation we are planning for */
Relids relids; /* relids of base relations in the underlying
* scan */
} foreign_glob_cxt;
/*
@ -94,6 +98,9 @@ typedef struct deparse_expr_cxt
{
PlannerInfo *root; /* global planner state */
RelOptInfo *foreignrel; /* the foreign relation we are planning for */
RelOptInfo *scanrel; /* the underlying scan relation. Same as
* foreignrel, when that represents a join or
* a base relation. */
StringInfo buf; /* output buffer to append to */
List **params_list; /* exprs that will become remote Params */
} deparse_expr_cxt;
@ -135,7 +142,7 @@ static void deparseColumnRef(StringInfo buf, int varno, int varattno,
static void deparseRelation(StringInfo buf, Relation rel);
static void deparseExpr(Expr *expr, deparse_expr_cxt *context);
static void deparseVar(Var *node, deparse_expr_cxt *context);
static void deparseConst(Const *node, deparse_expr_cxt *context);
static void deparseConst(Const *node, deparse_expr_cxt *context, int showtype);
static void deparseParam(Param *node, deparse_expr_cxt *context);
static void deparseArrayRef(ArrayRef *node, deparse_expr_cxt *context);
static void deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context);
@ -159,6 +166,14 @@ static void appendOrderByClause(List *pathkeys, deparse_expr_cxt *context);
static void appendConditions(List *exprs, deparse_expr_cxt *context);
static void deparseFromExprForRel(StringInfo buf, PlannerInfo *root,
RelOptInfo *joinrel, bool use_alias, List **params_list);
static void deparseFromExpr(List *quals, deparse_expr_cxt *context);
static void deparseAggref(Aggref *node, deparse_expr_cxt *context);
static void appendGroupByClause(List *tlist, deparse_expr_cxt *context);
static void appendAggOrderBy(List *orderList, List *targetList,
deparse_expr_cxt *context);
static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
static Node *deparseSortGroupClause(Index ref, List *tlist,
deparse_expr_cxt *context);
/*
@ -200,6 +215,7 @@ is_foreign_expr(PlannerInfo *root,
{
foreign_glob_cxt glob_cxt;
foreign_loc_cxt loc_cxt;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) (baserel->fdw_private);
/*
* Check that the expression consists of nodes that are safe to execute
@ -207,6 +223,16 @@ is_foreign_expr(PlannerInfo *root,
*/
glob_cxt.root = root;
glob_cxt.foreignrel = baserel;
/*
* For an upper relation, use relids from its underneath scan relation,
* because the upperrel's own relids currently aren't set to anything
* meaningful by the core code. For other relation, use their own relids.
*/
if (baserel->reloptkind == RELOPT_UPPER_REL)
glob_cxt.relids = fpinfo->outerrel->relids;
else
glob_cxt.relids = baserel->relids;
loc_cxt.collation = InvalidOid;
loc_cxt.state = FDW_COLLATE_NONE;
if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt))
@ -281,7 +307,7 @@ foreign_expr_walker(Node *node,
* Param's collation, ie it's not safe for it to have a
* non-default collation.
*/
if (bms_is_member(var->varno, glob_cxt->foreignrel->relids) &&
if (bms_is_member(var->varno, glob_cxt->relids) &&
var->varlevelsup == 0)
{
/* Var belongs to foreign table */
@ -631,6 +657,106 @@ foreign_expr_walker(Node *node,
check_type = false;
}
break;
case T_Aggref:
{
Aggref *agg = (Aggref *) node;
ListCell *lc;
/* Not safe to pushdown when not in grouping context */
if (glob_cxt->foreignrel->reloptkind != RELOPT_UPPER_REL)
return false;
/* Only non-split aggregates are pushable. */
if (agg->aggsplit != AGGSPLIT_SIMPLE)
return false;
/* As usual, it must be shippable. */
if (!is_shippable(agg->aggfnoid, ProcedureRelationId, fpinfo))
return false;
/*
* Recurse to input args. aggdirectargs, aggorder and
* aggdistinct are all present in args, so no need to check
* their shippability explicitly.
*/
foreach(lc, agg->args)
{
Node *n = (Node *) lfirst(lc);
/* If TargetEntry, extract the expression from it */
if (IsA(n, TargetEntry))
{
TargetEntry *tle = (TargetEntry *) n;
n = (Node *) tle->expr;
}
if (!foreign_expr_walker(n, glob_cxt, &inner_cxt))
return false;
}
/*
* For aggorder elements, check whether the sort operator, if
* specified, is shippable or not.
*/
if (agg->aggorder)
{
ListCell *lc;
foreach(lc, agg->aggorder)
{
SortGroupClause *srt = (SortGroupClause *) lfirst(lc);
Oid sortcoltype;
TypeCacheEntry *typentry;
TargetEntry *tle;
tle = get_sortgroupref_tle(srt->tleSortGroupRef,
agg->args);
sortcoltype = exprType((Node *) tle->expr);
typentry = lookup_type_cache(sortcoltype,
TYPECACHE_LT_OPR | TYPECACHE_GT_OPR);
/* Check shippability of non-default sort operator. */
if (srt->sortop != typentry->lt_opr &&
srt->sortop != typentry->gt_opr &&
!is_shippable(srt->sortop, OperatorRelationId,
fpinfo))
return false;
}
}
/* Check aggregate filter */
if (!foreign_expr_walker((Node *) agg->aggfilter,
glob_cxt, &inner_cxt))
return false;
/*
* If aggregate's input collation is not derived from a
* foreign Var, it can't be sent to remote.
*/
if (agg->inputcollid == InvalidOid)
/* OK, inputs are all noncollatable */ ;
else if (inner_cxt.state != FDW_COLLATE_SAFE ||
agg->inputcollid != inner_cxt.collation)
return false;
/*
* Detect whether node is introducing a collation not derived
* from a foreign Var. (If so, we just mark it unsafe for now
* rather than immediately returning false, since the parent
* node might not care.)
*/
collation = agg->aggcollid;
if (collation == InvalidOid)
state = FDW_COLLATE_NONE;
else if (inner_cxt.state == FDW_COLLATE_SAFE &&
collation == inner_cxt.collation)
state = FDW_COLLATE_SAFE;
else if (collation == DEFAULT_COLLATION_OID)
state = FDW_COLLATE_NONE;
else
state = FDW_COLLATE_UNSAFE;
}
break;
default:
/*
@ -720,7 +846,9 @@ deparse_type_name(Oid type_oid, int32 typemod)
* Build the targetlist for given relation to be deparsed as SELECT clause.
*
* The output targetlist contains the columns that need to be fetched from the
* foreign server for the given relation.
* foreign server for the given relation. If foreignrel is an upper relation,
* then the output targetlist can also contains expressions to be evaluated on
* foreign server.
*/
List *
build_tlist_to_deparse(RelOptInfo *foreignrel)
@ -728,6 +856,13 @@ build_tlist_to_deparse(RelOptInfo *foreignrel)
List *tlist = NIL;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
/*
* For an upper relation, we have already built the target list while
* checking shippability, so just return that.
*/
if (foreignrel->reloptkind == RELOPT_UPPER_REL)
return fpinfo->grouped_tlist;
/*
* We require columns specified in foreignrel->reltarget->exprs and those
* required for evaluating the local conditions.
@ -749,7 +884,8 @@ build_tlist_to_deparse(RelOptInfo *foreignrel)
* For a base relation fpinfo->attrs_used is used to construct SELECT clause,
* hence the tlist is ignored for a base relation.
*
* remote_conds is the list of conditions to be deparsed as WHERE clause.
* remote_conds is the list of conditions to be deparsed into the WHERE clause
* (or, in the case of upper relations, into the HAVING clause).
*
* If params_list is not NULL, it receives a list of Params and other-relation
* Vars used in the clauses; these values must be transmitted to the remote
@ -768,28 +904,58 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel,
List **retrieved_attrs, List **params_list)
{
deparse_expr_cxt context;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
List *quals;
/* We handle relations for foreign tables and joins between those */
/*
* We handle relations for foreign tables, joins between those and upper
* relations.
*/
Assert(rel->reloptkind == RELOPT_JOINREL ||
rel->reloptkind == RELOPT_BASEREL ||
rel->reloptkind == RELOPT_OTHER_MEMBER_REL);
rel->reloptkind == RELOPT_OTHER_MEMBER_REL ||
rel->reloptkind == RELOPT_UPPER_REL);
/* Fill portions of context common to join and base relation */
/* Fill portions of context common to upper, join and base relation */
context.buf = buf;
context.root = root;
context.foreignrel = rel;
context.scanrel = (rel->reloptkind == RELOPT_UPPER_REL) ?
fpinfo->outerrel : rel;
context.params_list = params_list;
/* Construct SELECT clause and FROM clause */
/* Construct SELECT clause */
deparseSelectSql(tlist, retrieved_attrs, &context);
/*
* Construct WHERE clause
* For upper relations, the WHERE clause is built from the remote
* conditions of the underlying scan relation; otherwise, we can use the
* supplied list of remote conditions directly.
*/
if (remote_conds)
if (rel->reloptkind == RELOPT_UPPER_REL)
{
appendStringInfo(buf, " WHERE ");
appendConditions(remote_conds, &context);
PgFdwRelationInfo *ofpinfo;
ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
quals = ofpinfo->remote_conds;
}
else
quals = remote_conds;
/* Construct FROM and WHERE clauses */
deparseFromExpr(quals, &context);
if (rel->reloptkind == RELOPT_UPPER_REL)
{
/* Append GROUP BY clause */
appendGroupByClause(tlist, &context);
/* Append HAVING clause */
if (remote_conds)
{
appendStringInfo(buf, " HAVING ");
appendConditions(remote_conds, &context);
}
}
/* Add ORDER BY clause if we found any useful pathkeys */
@ -803,7 +969,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel,
/*
* Construct a simple SELECT statement that retrieves desired columns
* of the specified foreign table, and append it to "buf". The output
* contains just "SELECT ... FROM ....".
* contains just "SELECT ... ".
*
* We also create an integer List of the columns being retrieved, which is
* returned to *retrieved_attrs.
@ -824,7 +990,8 @@ deparseSelectSql(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context)
*/
appendStringInfoString(buf, "SELECT ");
if (foreignrel->reloptkind == RELOPT_JOINREL)
if (foreignrel->reloptkind == RELOPT_JOINREL ||
foreignrel->reloptkind == RELOPT_UPPER_REL)
{
/* For a join relation use the input tlist */
deparseExplicitTargetList(tlist, retrieved_attrs, context);
@ -847,14 +1014,38 @@ deparseSelectSql(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context)
fpinfo->attrs_used, false, retrieved_attrs);
heap_close(rel, NoLock);
}
}
/*
* Construct FROM clause
*/
/*
* Construct a FROM clause and, if needed, a WHERE clause, and append those to
* "buf".
*
* quals is the list of clauses to be included in the WHERE clause.
*/
static void
deparseFromExpr(List *quals, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
RelOptInfo *foreignrel = context->foreignrel;
RelOptInfo *scanrel = context->scanrel;
/* For upper relations, scanrel must be either a joinrel or a baserel */
Assert(foreignrel->reloptkind != RELOPT_UPPER_REL ||
scanrel->reloptkind == RELOPT_JOINREL ||
scanrel->reloptkind == RELOPT_BASEREL);
/* Construct FROM clause */
appendStringInfoString(buf, " FROM ");
deparseFromExprForRel(buf, root, foreignrel,
(foreignrel->reloptkind == RELOPT_JOINREL),
deparseFromExprForRel(buf, context->root, scanrel,
(bms_num_members(scanrel->relids) > 1),
context->params_list);
/* Construct WHERE clause */
if (quals != NIL)
{
appendStringInfo(buf, " WHERE ");
appendConditions(quals, context);
}
}
/*
@ -957,14 +1148,14 @@ deparseTargetList(StringInfo buf,
/*
* Deparse the appropriate locking clause (FOR UPDATE or FOR SHARE) for a
* given relation (context->foreignrel).
* given relation (context->scanrel).
*/
static void
deparseLockingClause(deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
PlannerInfo *root = context->root;
RelOptInfo *rel = context->foreignrel;
RelOptInfo *rel = context->scanrel;
int relid = -1;
while ((relid = bms_next_member(rel->relids, relid)) >= 0)
@ -1024,7 +1215,7 @@ deparseLockingClause(deparse_expr_cxt *context)
}
/* Add the relation alias if we are here for a join relation */
if (rel->reloptkind == RELOPT_JOINREL &&
if (bms_num_members(rel->relids) > 1 &&
rc->strength != LCS_NONE)
appendStringInfo(buf, " OF %s%d", REL_ALIAS_PREFIX, relid);
}
@ -1036,7 +1227,7 @@ deparseLockingClause(deparse_expr_cxt *context)
* Deparse conditions from the provided list and append them to buf.
*
* The conditions in the list are assumed to be ANDed. This function is used to
* deparse both WHERE clauses and JOIN .. ON clauses.
* deparse WHERE clauses, JOIN .. ON clauses and HAVING clauses.
*/
static void
appendConditions(List *exprs, deparse_expr_cxt *context)
@ -1126,22 +1317,15 @@ deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
foreach(lc, tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Var *var;
/* Extract expression if TargetEntry node */
Assert(IsA(tle, TargetEntry));
var = (Var *) tle->expr;
/* We expect only Var nodes here */
if (!IsA(var, Var))
elog(ERROR, "non-Var not expected in target list");
if (i > 0)
appendStringInfoString(buf, ", ");
deparseVar(var, context);
deparseExpr((Expr *) tle->expr, context);
*retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
i++;
}
@ -1192,6 +1376,7 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
context.buf = buf;
context.foreignrel = foreignrel;
context.scanrel = foreignrel;
context.root = root;
context.params_list = params_list;
@ -1360,6 +1545,7 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
/* Set up context struct for recursion */
context.root = root;
context.foreignrel = baserel;
context.scanrel = baserel;
context.buf = buf;
context.params_list = params_list;
@ -1444,6 +1630,7 @@ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
/* Set up context struct for recursion */
context.root = root;
context.foreignrel = baserel;
context.scanrel = baserel;
context.buf = buf;
context.params_list = params_list;
@ -1817,7 +2004,7 @@ deparseExpr(Expr *node, deparse_expr_cxt *context)
deparseVar((Var *) node, context);
break;
case T_Const:
deparseConst((Const *) node, context);
deparseConst((Const *) node, context, 0);
break;
case T_Param:
deparseParam((Param *) node, context);
@ -1849,6 +2036,9 @@ deparseExpr(Expr *node, deparse_expr_cxt *context)
case T_ArrayExpr:
deparseArrayExpr((ArrayExpr *) node, context);
break;
case T_Aggref:
deparseAggref((Aggref *) node, context);
break;
default:
elog(ERROR, "unsupported expression type for deparse: %d",
(int) nodeTag(node));
@ -1867,10 +2057,12 @@ deparseExpr(Expr *node, deparse_expr_cxt *context)
static void
deparseVar(Var *node, deparse_expr_cxt *context)
{
bool qualify_col = (context->foreignrel->reloptkind == RELOPT_JOINREL);
Relids relids = context->scanrel->relids;
if (bms_is_member(node->varno, context->foreignrel->relids) &&
node->varlevelsup == 0)
/* Qualify columns when multiple relations are involved. */
bool qualify_col = (bms_num_members(relids) > 1);
if (bms_is_member(node->varno, relids) && node->varlevelsup == 0)
deparseColumnRef(context->buf, node->varno, node->varattno,
context->root, qualify_col);
else
@ -1908,9 +2100,12 @@ deparseVar(Var *node, deparse_expr_cxt *context)
* Deparse given constant value into context->buf.
*
* This function has to be kept in sync with ruleutils.c's get_const_expr.
* As for that function, showtype can be -1 to never show "::typename" decoration,
* or +1 to always show it, or 0 to show it only if the constant wouldn't be assumed
* to be the right type by default.
*/
static void
deparseConst(Const *node, deparse_expr_cxt *context)
deparseConst(Const *node, deparse_expr_cxt *context, int showtype)
{
StringInfo buf = context->buf;
Oid typoutput;
@ -1922,9 +2117,10 @@ deparseConst(Const *node, deparse_expr_cxt *context)
if (node->constisnull)
{
appendStringInfoString(buf, "NULL");
appendStringInfo(buf, "::%s",
deparse_type_name(node->consttype,
node->consttypmod));
if (showtype >= 0)
appendStringInfo(buf, "::%s",
deparse_type_name(node->consttype,
node->consttypmod));
return;
}
@ -1974,9 +2170,14 @@ deparseConst(Const *node, deparse_expr_cxt *context)
break;
}
pfree(extval);
if (showtype < 0)
return;
/*
* Append ::typename unless the constant will be implicitly typed as the
* right type when it is read in.
* For showtype == 0, append ::typename unless the constant will be
* implicitly typed as the right type when it is read in.
*
* XXX this code has to be kept in sync with the behavior of the parser,
* especially make_const.
@ -1995,7 +2196,7 @@ deparseConst(Const *node, deparse_expr_cxt *context)
needlabel = true;
break;
}
if (needlabel)
if (needlabel || showtype > 0)
appendStringInfo(buf, "::%s",
deparse_type_name(node->consttype,
node->consttypmod));
@ -2092,9 +2293,6 @@ static void
deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
HeapTuple proctup;
Form_pg_proc procform;
const char *proname;
bool use_variadic;
bool first;
ListCell *arg;
@ -2127,29 +2325,15 @@ deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context)
return;
}
/*
* Normal function: display as proname(args).
*/
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(node->funcid));
if (!HeapTupleIsValid(proctup))
elog(ERROR, "cache lookup failed for function %u", node->funcid);
procform = (Form_pg_proc) GETSTRUCT(proctup);
/* Check if need to print VARIADIC (cf. ruleutils.c) */
use_variadic = node->funcvariadic;
/* Print schema name only if it's not pg_catalog */
if (procform->pronamespace != PG_CATALOG_NAMESPACE)
{
const char *schemaname;
/*
* Normal function: display as proname(args).
*/
appendFunctionName(node->funcid, context);
appendStringInfoChar(buf, '(');
schemaname = get_namespace_name(procform->pronamespace);
appendStringInfo(buf, "%s.", quote_identifier(schemaname));
}
/* Deparse the function name ... */
proname = NameStr(procform->proname);
appendStringInfo(buf, "%s(", quote_identifier(proname));
/* ... and all the arguments */
first = true;
foreach(arg, node->args)
@ -2162,8 +2346,6 @@ deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context)
first = false;
}
appendStringInfoChar(buf, ')');
ReleaseSysCache(proctup);
}
/*
@ -2419,6 +2601,152 @@ deparseArrayExpr(ArrayExpr *node, deparse_expr_cxt *context)
deparse_type_name(node->array_typeid, -1));
}
/*
* Deparse an Aggref node.
*/
static void
deparseAggref(Aggref *node, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
bool use_variadic;
/* Only basic, non-split aggregation accepted. */
Assert(node->aggsplit == AGGSPLIT_SIMPLE);
/* Check if need to print VARIADIC (cf. ruleutils.c) */
use_variadic = node->aggvariadic;
/* Find aggregate name from aggfnoid which is a pg_proc entry */
appendFunctionName(node->aggfnoid, context);
appendStringInfoChar(buf, '(');
/* Add DISTINCT */
appendStringInfo(buf, "%s", (node->aggdistinct != NIL) ? "DISTINCT " : "");
if (AGGKIND_IS_ORDERED_SET(node->aggkind))
{
/* Add WITHIN GROUP (ORDER BY ..) */
ListCell *arg;
bool first = true;
Assert(!node->aggvariadic);
Assert(node->aggorder != NIL);
foreach(arg, node->aggdirectargs)
{
if (!first)
appendStringInfoString(buf, ", ");
first = false;
deparseExpr((Expr *) lfirst(arg), context);
}
appendStringInfoString(buf, ") WITHIN GROUP (ORDER BY ");
appendAggOrderBy(node->aggorder, node->args, context);
}
else
{
/* aggstar can be set only in zero-argument aggregates */
if (node->aggstar)
appendStringInfoChar(buf, '*');
else
{
ListCell *arg;
bool first = true;
/* Add all the arguments */
foreach(arg, node->args)
{
TargetEntry *tle = (TargetEntry *) lfirst(arg);
Node *n = (Node *) tle->expr;
if (tle->resjunk)
continue;
if (!first)
appendStringInfoString(buf, ", ");
first = false;
/* Add VARIADIC */
if (use_variadic && lnext(arg) == NULL)
appendStringInfoString(buf, "VARIADIC ");
deparseExpr((Expr *) n, context);
}
}
/* Add ORDER BY */
if (node->aggorder != NIL)
{
appendStringInfoString(buf, " ORDER BY ");
appendAggOrderBy(node->aggorder, node->args, context);
}
}
/* Add FILTER (WHERE ..) */
if (node->aggfilter != NULL)
{
appendStringInfoString(buf, ") FILTER (WHERE ");
deparseExpr((Expr *) node->aggfilter, context);
}
appendStringInfoChar(buf, ')');
}
/*
* Append ORDER BY within aggregate function.
*/
static void
appendAggOrderBy(List *orderList, List *targetList, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
ListCell *lc;
bool first = true;
foreach(lc, orderList)
{
SortGroupClause *srt = (SortGroupClause *) lfirst(lc);
Node *sortexpr;
Oid sortcoltype;
TypeCacheEntry *typentry;
if (!first)
appendStringInfoString(buf, ", ");
first = false;
sortexpr = deparseSortGroupClause(srt->tleSortGroupRef, targetList,
context);
sortcoltype = exprType(sortexpr);
/* See whether operator is default < or > for datatype */
typentry = lookup_type_cache(sortcoltype,
TYPECACHE_LT_OPR | TYPECACHE_GT_OPR);
if (srt->sortop == typentry->lt_opr)
appendStringInfoString(buf, " ASC");
else if (srt->sortop == typentry->gt_opr)
appendStringInfoString(buf, " DESC");
else
{
HeapTuple opertup;
Form_pg_operator operform;
appendStringInfoString(buf, " USING ");
/* Append operator name. */
opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(srt->sortop));
if (!HeapTupleIsValid(opertup))
elog(ERROR, "cache lookup failed for operator %u", srt->sortop);
operform = (Form_pg_operator) GETSTRUCT(opertup);
deparseOperatorName(buf, operform);
ReleaseSysCache(opertup);
}
if (srt->nulls_first)
appendStringInfoString(buf, " NULLS FIRST");
else
appendStringInfoString(buf, " NULLS LAST");
}
}
/*
* Print the representation of a parameter to be sent to the remote side.
*
@ -2463,6 +2791,41 @@ printRemotePlaceholder(Oid paramtype, int32 paramtypmod,
appendStringInfo(buf, "((SELECT null::%s)::%s)", ptypename, ptypename);
}
/*
* Deparse GROUP BY clause.
*/
static void
appendGroupByClause(List *tlist, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
Query *query = context->root->parse;
ListCell *lc;
bool first = true;
/* Nothing to be done, if there's no GROUP BY clause in the query. */
if (!query->groupClause)
return;
appendStringInfo(buf, " GROUP BY ");
/*
* Queries with grouping sets are not pushed down, so we don't expect
* grouping sets here.
*/
Assert(!query->groupingSets);
foreach(lc, query->groupClause)
{
SortGroupClause *grp = (SortGroupClause *) lfirst(lc);
if (!first)
appendStringInfoString(buf, ", ");
first = false;
deparseSortGroupClause(grp->tleSortGroupRef, tlist, context);
}
}
/*
* Deparse ORDER BY clause according to the given pathkeys for given base
* relation. From given pathkeys expressions belonging entirely to the given
@ -2474,7 +2837,7 @@ appendOrderByClause(List *pathkeys, deparse_expr_cxt *context)
ListCell *lcell;
int nestlevel;
char *delim = " ";
RelOptInfo *baserel = context->foreignrel;
RelOptInfo *baserel = context->scanrel;
StringInfo buf = context->buf;
/* Make sure any constants in the exprs are printed portably */
@ -2505,3 +2868,74 @@ appendOrderByClause(List *pathkeys, deparse_expr_cxt *context)
}
reset_transmission_modes(nestlevel);
}
/*
* appendFunctionName
* Deparses function name from given function oid.
*/
static void
appendFunctionName(Oid funcid, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
HeapTuple proctup;
Form_pg_proc procform;
const char *proname;
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(proctup))
elog(ERROR, "cache lookup failed for function %u", funcid);
procform = (Form_pg_proc) GETSTRUCT(proctup);
/* Print schema name only if it's not pg_catalog */
if (procform->pronamespace != PG_CATALOG_NAMESPACE)
{
const char *schemaname;
schemaname = get_namespace_name(procform->pronamespace);
appendStringInfo(buf, "%s.", quote_identifier(schemaname));
}
/* Always print the function name */
proname = NameStr(procform->proname);
appendStringInfo(buf, "%s", quote_identifier(proname));
ReleaseSysCache(proctup);
}
/*
* Appends a sort or group clause.
*
* Like get_rule_sortgroupclause(), returns the expression tree, so caller
* need not find it again.
*/
static Node *
deparseSortGroupClause(Index ref, List *tlist, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
TargetEntry *tle;
Expr *expr;
tle = get_sortgroupref_tle(ref, tlist);
expr = tle->expr;
if (expr && IsA(expr, Const))
{
/*
* Force a typecast here so that we don't emit something like "GROUP
* BY 2", which will be misconstrued as a column position rather than
* a constant.
*/
deparseConst((Const *) expr, context, 1);
}
else if (!expr || IsA(expr, Var))
deparseExpr(expr, context);
else
{
/* Always parenthesize the expression. */
appendStringInfoString(buf, "(");
deparseExpr(expr, context);
appendStringInfoString(buf, ")");
}
return (Node *) expr;
}

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/cost.h"
#include "optimizer/clauses.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
@ -38,6 +39,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/sampling.h"
#include "utils/selfuncs.h"
PG_MODULE_MAGIC;
@ -343,6 +345,10 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root,
JoinPathExtraData *extra);
static bool postgresRecheckForeignScan(ForeignScanState *node,
TupleTableSlot *slot);
static void postgresGetForeignUpperPaths(PlannerInfo *root,
UpperRelationKind stage,
RelOptInfo *input_rel,
RelOptInfo *output_rel);
/*
* Helper functions
@ -400,11 +406,15 @@ static void conversion_error_callback(void *arg);
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
JoinPathExtraData *extra);
static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
RelOptInfo *rel);
static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
Path *epq_path);
static void add_foreign_grouping_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *grouped_rel);
/*
@ -455,6 +465,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for join push-down */
routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
PG_RETURN_POINTER(routine);
}
@ -1120,7 +1133,7 @@ postgresGetForeignPlan(PlannerInfo *root,
* rel->baserestrictinfo + parameterization clauses through
* scan_clauses. For a join rel->baserestrictinfo is NIL and we are
* not considering parameterization right now, so there should be no
* scan_clauses for a joinrel.
* scan_clauses for a joinrel and upper rel either.
*/
Assert(!scan_clauses);
}
@ -1170,7 +1183,8 @@ postgresGetForeignPlan(PlannerInfo *root,
local_exprs = lappend(local_exprs, rinfo->clause);
}
if (foreignrel->reloptkind == RELOPT_JOINREL)
if (foreignrel->reloptkind == RELOPT_JOINREL ||
foreignrel->reloptkind == RELOPT_UPPER_REL)
{
/* For a join relation, get the conditions from fdw_private structure */
remote_conds = fpinfo->remote_conds;
@ -1191,6 +1205,13 @@ postgresGetForeignPlan(PlannerInfo *root,
{
ListCell *lc;
/*
* Right now, we only consider grouping and aggregation beyond
* joins. Queries involving aggregates or grouping do not require
* EPQ mechanism, hence should not have an outer plan here.
*/
Assert(foreignrel->reloptkind != RELOPT_UPPER_REL);
outer_plan->targetlist = fdw_scan_tlist;
foreach(lc, local_exprs)
@ -1228,7 +1249,8 @@ postgresGetForeignPlan(PlannerInfo *root,
remote_conds,
retrieved_attrs,
makeInteger(fpinfo->fetch_size));
if (foreignrel->reloptkind == RELOPT_JOINREL)
if (foreignrel->reloptkind == RELOPT_JOINREL ||
foreignrel->reloptkind == RELOPT_UPPER_REL)
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name->data));
@ -1280,8 +1302,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/*
* Identify which user to do the remote access as. This should match what
* ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered
* member RTE as a representative; we would get the same result from any.
* ExecCheckRTEPerms() does. In case of a join or aggregate, use the
* lowest-numbered member RTE as a representative; we would get the same
* result from any.
*/
if (fsplan->scan.scanrelid > 0)
rtindex = fsplan->scan.scanrelid;
@ -2452,7 +2475,8 @@ postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
/*
* estimate_path_cost_size
* Get cost and size estimates for a foreign scan on given foreign relation
* either a base relation or a join between foreign relations.
* either a base relation or a join between foreign relations or an upper
* relation containing foreign relations.
*
* param_join_conds are the parameterization clauses with outer relations.
* pathkeys specify the expected sort order if any for given path being costed.
@ -2505,7 +2529,8 @@ estimate_path_cost_size(PlannerInfo *root,
&remote_param_join_conds, &local_param_join_conds);
/* Build the list of columns to be fetched from the foreign server. */
if (foreignrel->reloptkind == RELOPT_JOINREL)
if (foreignrel->reloptkind == RELOPT_JOINREL ||
foreignrel->reloptkind == RELOPT_UPPER_REL)
fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
else
fdw_scan_tlist = NIL;
@ -2586,25 +2611,7 @@ estimate_path_cost_size(PlannerInfo *root,
startup_cost = fpinfo->rel_startup_cost;
run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
}
else if (foreignrel->reloptkind != RELOPT_JOINREL)
{
/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
/*
* Cost as though this were a seqscan, which is pessimistic. We
* effectively imagine the local_conds are being evaluated
* remotely, too.
*/
startup_cost = 0;
run_cost = 0;
run_cost += seq_page_cost * foreignrel->pages;
startup_cost += foreignrel->baserestrictcost.startup;
cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
run_cost += cpu_per_tuple * foreignrel->tuples;
}
else
else if (foreignrel->reloptkind == RELOPT_JOINREL)
{
PgFdwRelationInfo *fpinfo_i;
PgFdwRelationInfo *fpinfo_o;
@ -2670,6 +2677,99 @@ estimate_path_cost_size(PlannerInfo *root,
run_cost += nrows * remote_conds_cost.per_tuple;
run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
}
else if (foreignrel->reloptkind == RELOPT_UPPER_REL)
{
PgFdwRelationInfo *ofpinfo;
PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
AggClauseCosts aggcosts;
double input_rows;
int numGroupCols;
double numGroups = 1;
/*
* This cost model is mixture of costing done for sorted and
* hashed aggregates in cost_agg(). We are not sure which
* strategy will be considered at remote side, thus for
* simplicity, we put all startup related costs in startup_cost
* and all finalization and run cost are added in total_cost.
*
* Also, core does not care about costing HAVING expressions and
* adding that to the costs. So similarly, here too we are not
* considering remote and local conditions for costing.
*/
ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
/* Get rows and width from input rel */
input_rows = ofpinfo->rows;
width = ofpinfo->width;
/* Collect statistics about aggregates for estimating costs. */
MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
if (root->parse->hasAggs)
{
get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
AGGSPLIT_SIMPLE, &aggcosts);
get_agg_clause_costs(root, (Node *) root->parse->havingQual,
AGGSPLIT_SIMPLE, &aggcosts);
}
/* Get number of grouping columns and possible number of groups */
numGroupCols = list_length(root->parse->groupClause);
numGroups = estimate_num_groups(root,
get_sortgrouplist_exprs(root->parse->groupClause,
fpinfo->grouped_tlist),
input_rows, NULL);
/*
* Number of rows expected from foreign server will be same as
* that of number of groups.
*/
rows = retrieved_rows = numGroups;
/*-----
* Startup cost includes:
* 1. Startup cost for underneath input * relation
* 2. Cost of performing aggregation, per cost_agg()
* 3. Startup cost for PathTarget eval
*-----
*/
startup_cost = ofpinfo->rel_startup_cost;
startup_cost += aggcosts.transCost.startup;
startup_cost += aggcosts.transCost.per_tuple * input_rows;
startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
startup_cost += ptarget->cost.startup;
/*-----
* Run time cost includes:
* 1. Run time cost of underneath input relation
* 2. Run time cost of performing aggregation, per cost_agg()
* 3. PathTarget eval cost for each output row
*-----
*/
run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
run_cost += aggcosts.finalCost * numGroups;
run_cost += cpu_tuple_cost * numGroups;
run_cost += ptarget->cost.per_tuple * numGroups;
}
else
{
/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
/*
* Cost as though this were a seqscan, which is pessimistic. We
* effectively imagine the local_conds are being evaluated
* remotely, too.
*/
startup_cost = 0;
run_cost = 0;
run_cost += seq_page_cost * foreignrel->pages;
startup_cost += foreignrel->baserestrictcost.startup;
cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
run_cost += cpu_per_tuple * foreignrel->tuples;
}
/*
* Without remote estimates, we have no real way to estimate the cost
@ -4342,6 +4442,318 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
* this function to PgFdwRelationInfo of the input relation.
*/
static bool
foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel)
{
Query *query = root->parse;
PathTarget *grouping_target;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
PgFdwRelationInfo *ofpinfo;
List *aggvars;
ListCell *lc;
int i;
List *tlist = NIL;
/* Grouping Sets are not pushable */
if (query->groupingSets)
return false;
/* Get the fpinfo of the underlying scan relation. */
ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
/*
* If underneath input relation has any local conditions, those conditions
* are required to be applied before performing aggregation. Hence the
* aggregate cannot be pushed down.
*/
if (ofpinfo->local_conds)
return false;
/*
* The targetlist expected from this node and the targetlist pushed down
* to the foreign server may be different. The latter requires
* sortgrouprefs to be set to push down GROUP BY clause, but should not
* have those arising from ORDER BY clause. These sortgrouprefs may be
* different from those in the plan's targetlist. Use a copy of path
* target to record the new sortgrouprefs.
*/
grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
/*
* Evaluate grouping targets and check whether they are safe to push down
* to the foreign side. All GROUP BY expressions will be part of the
* grouping target and thus there is no need to evaluate it separately.
* While doing so, add required expressions into target list which can
* then be used to pass to foreign server.
*/
i = 0;
foreach(lc, grouping_target->exprs)
{
Expr *expr = (Expr *) lfirst(lc);
Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
ListCell *l;
/* Check whether this expression is part of GROUP BY clause */
if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
{
/*
* If any of the GROUP BY expression is not shippable we can not
* push down aggregation to the foreign server.
*/
if (!is_foreign_expr(root, grouped_rel, expr))
return false;
/* Pushable, add to tlist */
tlist = add_to_flat_tlist(tlist, list_make1(expr));
}
else
{
/* Check entire expression whether it is pushable or not */
if (is_foreign_expr(root, grouped_rel, expr))
{
/* Pushable, add to tlist */
tlist = add_to_flat_tlist(tlist, list_make1(expr));
}
else
{
/*
* If we have sortgroupref set, then it means that we have an
* ORDER BY entry pointing to this expression. Since we are
* not pushing ORDER BY with GROUP BY, clear it.
*/
if (sgref)
grouping_target->sortgrouprefs[i] = 0;
/* Not matched exactly, pull the var with aggregates then */
aggvars = pull_var_clause((Node *) expr,
PVC_INCLUDE_AGGREGATES);
if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
return false;
/*
* Add aggregates, if any, into the targetlist. Plain var
* nodes should be either same as some GROUP BY expression or
* part of some GROUP BY expression. In later case, the query
* cannot refer plain var nodes without the surrounding
* expression. In both the cases, they are already part of
* the targetlist and thus no need to add them again. In fact
* adding pulled plain var nodes in SELECT clause will cause
* an error on the foreign server if they are not same as some
* GROUP BY expression.
*/
foreach(l, aggvars)
{
Expr *expr = (Expr *) lfirst(l);
if (IsA(expr, Aggref))
tlist = add_to_flat_tlist(tlist, list_make1(expr));
}
}
}
i++;
}
/*
* Classify the pushable and non-pushable having clauses and save them in
* remote_conds and local_conds of the grouped rel's fpinfo.
*/
if (root->hasHavingQual && query->havingQual)
{
ListCell *lc;
foreach(lc, (List *) query->havingQual)
{
Expr *expr = (Expr *) lfirst(lc);
if (!is_foreign_expr(root, grouped_rel, expr))
fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
else
fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
}
}
/*
* If there are any local conditions, pull Vars and aggregates from it and
* check whether they are safe to pushdown or not.
*/
if (fpinfo->local_conds)
{
ListCell *lc;
List *aggvars = pull_var_clause((Node *) fpinfo->local_conds,
PVC_INCLUDE_AGGREGATES);
foreach(lc, aggvars)
{
Expr *expr = (Expr *) lfirst(lc);
/*
* If aggregates within local conditions are not safe to push
* down, then we cannot push down the query. Vars are already
* part of GROUP BY clause which are checked above, so no need to
* access them again here.
*/
if (IsA(expr, Aggref))
{
if (!is_foreign_expr(root, grouped_rel, expr))
return false;
tlist = add_to_flat_tlist(tlist, aggvars);
}
}
}
/* Transfer any sortgroupref data to the replacement tlist */
apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
/* Store generated targetlist */
fpinfo->grouped_tlist = tlist;
/* Safe to pushdown */
fpinfo->pushdown_safe = true;
/*
* If user is willing to estimate cost for a scan using EXPLAIN, he
* intends to estimate scans on that relation more accurately. Then, it
* makes sense to estimate the cost of the grouping on that relation more
* accurately using EXPLAIN.
*/
fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate;
/* Copy startup and tuple cost as is from underneath input rel's fpinfo */
fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost;
fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost;
/*
* Set cached relation costs to some negative value, so that we can detect
* when they are set to some sensible costs, during one (usually the
* first) of the calls to estimate_path_cost_size().
*/
fpinfo->rel_startup_cost = -1;
fpinfo->rel_total_cost = -1;
/* Set fetch size same as that of underneath input rel's fpinfo */
fpinfo->fetch_size = ofpinfo->fetch_size;
/*
* Set the string describing this grouped relation to be used in EXPLAIN
* output of corresponding ForeignScan.
*/
fpinfo->relation_name = makeStringInfo();
appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
ofpinfo->relation_name->data);
return true;
}
/*
* postgresGetForeignUpperPaths
* Add paths for post-join operations like aggregation, grouping etc. if
* corresponding operations are safe to push down.
*
* Right now, we only support aggregate, grouping and having clause pushdown.
*/
static void
postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
RelOptInfo *input_rel, RelOptInfo *output_rel)
{
PgFdwRelationInfo *fpinfo;
/*
* If input rel is not safe to pushdown, then simply return as we cannot
* perform any post-join operations on the foreign server.
*/
if (!input_rel->fdw_private ||
!((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
return;
/* Ignore stages we don't support; and skip any duplicate calls. */
if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
return;
fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
fpinfo->pushdown_safe = false;
output_rel->fdw_private = fpinfo;
add_foreign_grouping_paths(root, input_rel, output_rel);
}
/*
* add_foreign_grouping_paths
* Add foreign path for grouping and/or aggregation.
*
* Given input_rel represents the underlying scan. The paths are added to the
* given grouped_rel.
*/
static void
add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *grouped_rel)
{
Query *parse = root->parse;
PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
ForeignPath *grouppath;
PathTarget *grouping_target;
double rows;
int width;
Cost startup_cost;
Cost total_cost;
/* Nothing to be done, if there is no grouping or aggregation required. */
if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
!root->hasHavingQual)
return;
grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
/* save the input_rel as outerrel in fpinfo */
fpinfo->outerrel = input_rel;
/*
* Copy foreign table, foreign server, user mapping, shippable extensions
* etc. details from the input relation's fpinfo.
*/
fpinfo->table = ifpinfo->table;
fpinfo->server = ifpinfo->server;
fpinfo->user = ifpinfo->user;
fpinfo->shippable_extensions = ifpinfo->shippable_extensions;
/* Assess if it is safe to push down aggregation and grouping. */
if (!foreign_grouping_ok(root, grouped_rel))
return;
/* Estimate the cost of push down */
estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
&width, &startup_cost, &total_cost);
/* Now update this information in the fpinfo */
fpinfo->rows = rows;
fpinfo->width = width;
fpinfo->startup_cost = startup_cost;
fpinfo->total_cost = total_cost;
/* Create and add foreign path to the grouping relation. */
grouppath = create_foreignscan_path(root,
grouped_rel,
grouping_target,
rows,
startup_cost,
total_cost,
NIL, /* no pathkeys */
NULL, /* no required_outer */
NULL,
NULL); /* no fdw_private */
/* Add generated path into grouped_rel by add_path(). */
add_path(grouped_rel, (Path *) grouppath);
}
/*
* Create a tuple from the specified row of the PGresult.
*
@ -4549,24 +4961,34 @@ conversion_error_callback(void *arg)
ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
EState *estate = fsstate->ss.ps.state;
TargetEntry *tle;
Var *var;
RangeTblEntry *rte;
Assert(IsA(fsplan, ForeignScan));
tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
errpos->cur_attno - 1);
Assert(IsA(tle, TargetEntry));
var = (Var *) tle->expr;
Assert(IsA(var, Var));
rte = rt_fetch(var->varno, estate->es_range_table);
/*
* Target list can have Vars and expressions. For Vars, we can get
* it's relation, however for expressions we can't. Thus for
* expressions, just show generic context message.
*/
if (IsA(tle->expr, Var))
{
RangeTblEntry *rte;
Var *var = (Var *) tle->expr;
if (var->varattno == 0)
is_wholerow = true;
rte = rt_fetch(var->varno, estate->es_range_table);
if (var->varattno == 0)
is_wholerow = true;
else
attname = get_relid_attribute_name(rte->relid, var->varattno);
relname = get_rel_name(rte->relid);
}
else
attname = get_relid_attribute_name(rte->relid, var->varattno);
relname = get_rel_name(rte->relid);
errcontext("processing expression at position %d in select list",
errpos->cur_attno);
}
if (relname)

View File

@ -92,6 +92,9 @@ typedef struct PgFdwRelationInfo
RelOptInfo *innerrel;
JoinType jointype;
List *joinclauses;
/* Grouping information */
List *grouped_tlist;
} PgFdwRelationInfo;
/* in postgres_fdw.c */
@ -155,7 +158,7 @@ extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
List **retrieved_attrs);
extern void deparseStringLiteral(StringInfo buf, const char *val);
extern Expr *find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel);
extern List *build_tlist_to_deparse(RelOptInfo *foreign_rel);
extern List *build_tlist_to_deparse(RelOptInfo *foreignrel);
extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
RelOptInfo *foreignrel, List *tlist,
List *remote_conds, List *pathkeys,

View File

@ -541,6 +541,310 @@ ALTER VIEW v4 OWNER TO regress_view_owner;
DROP OWNED BY regress_view_owner;
DROP ROLE regress_view_owner;
-- ===================================================================
-- Aggregate and grouping queries
-- ===================================================================
-- Simple aggregates
explain (verbose, costs off)
select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2;
select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2;
-- Aggregate is not pushed down as aggregation contains random()
explain (verbose, costs off)
select sum(c1 * (random() <= 1)::int) as sum, avg(c1) from ft1;
-- Aggregate over join query
explain (verbose, costs off)
select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6;
select count(*), sum(t1.c1), avg(t2.c1) from ft1 t1 inner join ft1 t2 on (t1.c2 = t2.c2) where t1.c2 = 6;
-- Not pushed down due to local conditions present in underneath input rel
explain (verbose, costs off)
select sum(t1.c1), count(t2.c1) from ft1 t1 inner join ft2 t2 on (t1.c1 = t2.c1) where ((t1.c1 * t2.c1)/(t1.c1 * t2.c1)) * random() <= 1;
-- GROUP BY clause having expressions
explain (verbose, costs off)
select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2;
select c2/2, sum(c2) * (c2/2) from ft1 group by c2/2 order by c2/2;
-- Aggregates in subquery are pushed down.
explain (verbose, costs off)
select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
select count(x.a), sum(x.a) from (select c2 a, sum(c1) b from ft1 group by c2, sqrt(c1) order by 1, 2) x;
-- Aggregate is still pushed down by taking unshippable expression out
explain (verbose, costs off)
select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2;
select c2 * (random() <= 1)::int as sum1, sum(c1) * c2 as sum2 from ft1 group by c2 order by 1, 2;
-- Aggregate with unshippable GROUP BY clause are not pushed
explain (verbose, costs off)
select c2 * (random() <= 1)::int as c2 from ft2 group by c2 * (random() <= 1)::int order by 1;
-- GROUP BY clause in various forms, cardinal, alias and constant expression
explain (verbose, costs off)
select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2;
select count(c2) w, c2 x, 5 y, 7.0 z from ft1 group by 2, y, 9.0::int order by 2;
-- Testing HAVING clause shippability
explain (verbose, costs off)
select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2;
select c2, sum(c1) from ft2 group by c2 having avg(c1) < 500 and sum(c1) < 49800 order by c2;
-- Using expressions in HAVING clause
explain (verbose, costs off)
select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2;
select c5, count(c2) from ft1 group by c5, sqrt(c2) having sqrt(max(c2)) = sqrt(2) order by 1, 2;
-- Unshippable HAVING clause will be evaluated locally, and other qual in HAVING clause is pushed down
explain (verbose, costs off)
select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x;
select count(*) from (select c5, count(c1) from ft1 group by c5, sqrt(c2) having (avg(c1) / avg(c1)) * random() <= 1 and avg(c1) < 500) x;
-- Aggregate in HAVING clause is not pushable, and thus aggregation is not pushed down
explain (verbose, costs off)
select sum(c1) from ft1 group by c2 having avg(c1 * (random() <= 1)::int) > 100 order by 1;
-- Testing ORDER BY, DISTINCT, FILTER, Ordered-sets and VARIADIC within aggregates
-- ORDER BY within aggregate, same column used to order
explain (verbose, costs off)
select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1;
select array_agg(c1 order by c1) from ft1 where c1 < 100 group by c2 order by 1;
-- ORDER BY within aggregate, different column used to order also using DESC
explain (verbose, costs off)
select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50;
select array_agg(c5 order by c1 desc) from ft2 where c2 = 6 and c1 < 50;
-- DISTINCT within aggregate
explain (verbose, costs off)
select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
select array_agg(distinct (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
-- DISTINCT combined with ORDER BY within aggregate
explain (verbose, costs off)
select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
explain (verbose, costs off)
select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
select array_agg(distinct (t1.c1)%5 order by (t1.c1)%5 desc nulls last) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) where t1.c1 < 20 or (t1.c1 is null and t2.c1 < 5) group by (t2.c1)%3 order by 1;
-- FILTER within aggregate
explain (verbose, costs off)
select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last;
select sum(c1) filter (where c1 < 100 and c2 > 5) from ft1 group by c2 order by 1 nulls last;
-- DISTINCT, ORDER BY and FILTER within aggregate
explain (verbose, costs off)
select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2;
select sum(c1%3), sum(distinct c1%3 order by c1%3) filter (where c1%3 < 2), c2 from ft1 where c2 = 6 group by c2;
-- Outer query is aggregation query
explain (verbose, costs off)
select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1;
select distinct (select count(*) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1;
-- Inner query is aggregation query
explain (verbose, costs off)
select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1;
select distinct (select count(t1.c1) filter (where t2.c2 = 6 and t2.c1 < 10) from ft1 t1 where t1.c1 = 6) from ft2 t2 order by 1;
-- Aggregate not pushed down as FILTER condition is not pushable
explain (verbose, costs off)
select sum(c1) filter (where (c1 / c1) * random() <= 1) from ft1 group by c2 order by 1;
explain (verbose, costs off)
select sum(c2) filter (where c2 in (select c2 from ft1 where c2 < 5)) from ft1;
-- Ordered-sets within aggregate
explain (verbose, costs off)
select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2;
select c2, rank('10'::varchar) within group (order by c6), percentile_cont(c2/10::numeric) within group (order by c1) from ft1 where c2 < 10 group by c2 having percentile_cont(c2/10::numeric) within group (order by c1) < 500 order by c2;
-- Using multiple arguments within aggregates
explain (verbose, costs off)
select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1;
select c1, rank(c1, c2) within group (order by c1, c2) from ft1 group by c1, c2 having c1 = 6 order by 1;
-- User defined function for user defined aggregate, VARIADIC
create function least_accum(anyelement, variadic anyarray)
returns anyelement language sql as
'select least($1, min($2[i])) from generate_subscripts($2,1) g(i)';
create aggregate least_agg(variadic items anyarray) (
stype = anyelement, sfunc = least_accum
);
-- Not pushed down due to user defined aggregate
explain (verbose, costs off)
select c2, least_agg(c1) from ft1 group by c2 order by c2;
-- Add function and aggregate into extension
alter extension postgres_fdw add function least_accum(anyelement, variadic anyarray);
alter extension postgres_fdw add aggregate least_agg(variadic items anyarray);
alter server loopback options (set extensions 'postgres_fdw');
-- Now aggregate will be pushed. Aggregate will display VARIADIC argument.
explain (verbose, costs off)
select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2;
select c2, least_agg(c1) from ft1 where c2 < 100 group by c2 order by c2;
-- Remove function and aggregate from extension
alter extension postgres_fdw drop function least_accum(anyelement, variadic anyarray);
alter extension postgres_fdw drop aggregate least_agg(variadic items anyarray);
alter server loopback options (set extensions 'postgres_fdw');
-- Not pushed down as we have dropped objects from extension.
explain (verbose, costs off)
select c2, least_agg(c1) from ft1 group by c2 order by c2;
-- Cleanup
drop aggregate least_agg(variadic items anyarray);
drop function least_accum(anyelement, variadic anyarray);
-- Testing USING OPERATOR() in ORDER BY within aggregate.
-- For this, we need user defined operators along with operator family and
-- operator class. Create those and then add them in extension. Note that
-- user defined objects are considered unshippable unless they are part of
-- the extension.
create operator public.<^ (
leftarg = int4,
rightarg = int4,
procedure = int4eq
);
create operator public.=^ (
leftarg = int4,
rightarg = int4,
procedure = int4lt
);
create operator public.>^ (
leftarg = int4,
rightarg = int4,
procedure = int4gt
);
create operator family my_op_family using btree;
create function my_op_cmp(a int, b int) returns int as
$$begin return btint4cmp(a, b); end $$ language plpgsql;
create operator class my_op_class for type int using btree family my_op_family as
operator 1 public.<^,
operator 3 public.=^,
operator 5 public.>^,
function 1 my_op_cmp(int, int);
-- This will not be pushed as user defined sort operator is not part of the
-- extension yet.
explain (verbose, costs off)
select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2;
-- Add into extension
alter extension postgres_fdw add operator class my_op_class using btree;
alter extension postgres_fdw add function my_op_cmp(a int, b int);
alter extension postgres_fdw add operator family my_op_family using btree;
alter extension postgres_fdw add operator public.<^(int, int);
alter extension postgres_fdw add operator public.=^(int, int);
alter extension postgres_fdw add operator public.>^(int, int);
alter server loopback options (set extensions 'postgres_fdw');
-- Now this will be pushed as sort operator is part of the extension.
explain (verbose, costs off)
select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2;
select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2;
-- Remove from extension
alter extension postgres_fdw drop operator class my_op_class using btree;
alter extension postgres_fdw drop function my_op_cmp(a int, b int);
alter extension postgres_fdw drop operator family my_op_family using btree;
alter extension postgres_fdw drop operator public.<^(int, int);
alter extension postgres_fdw drop operator public.=^(int, int);
alter extension postgres_fdw drop operator public.>^(int, int);
alter server loopback options (set extensions 'postgres_fdw');
-- This will not be pushed as sort operator is now removed from the extension.
explain (verbose, costs off)
select array_agg(c1 order by c1 using operator(public.<^)) from ft2 where c2 = 6 and c1 < 100 group by c2;
-- Cleanup
drop operator class my_op_class using btree;
drop function my_op_cmp(a int, b int);
drop operator family my_op_family using btree;
drop operator public.>^(int, int);
drop operator public.=^(int, int);
drop operator public.<^(int, int);
-- Input relation to aggregate push down hook is not safe to pushdown and thus
-- the aggregate cannot be pushed down to foreign server.
explain (verbose, costs off)
select count(t1.c3) from ft1 t1, ft1 t2 where t1.c1 = postgres_fdw_abs(t1.c2);
-- Subquery in FROM clause having aggregate
explain (verbose, costs off)
select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2;
select count(*), x.b from ft1, (select c2 a, sum(c1) b from ft1 group by c2) x where ft1.c2 = x.a group by x.b order by 1, 2;
-- FULL join with IS NULL check in HAVING
explain (verbose, costs off)
select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2;
select avg(t1.c1), sum(t2.c1) from ft4 t1 full join ft5 t2 on (t1.c1 = t2.c1) group by t2.c1 having (avg(t1.c1) is null and sum(t2.c1) < 10) or sum(t2.c1) is null order by 1 nulls last, 2;
-- ORDER BY expression is part of the target list but not pushed down to
-- foreign server.
explain (verbose, costs off)
select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1;
select sum(c2) * (random() <= 1)::int as sum from ft1 order by 1;
-- LATERAL join, with parameterization
set enable_hashagg to false;
explain (verbose, costs off)
select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1;
select c2, sum from "S 1"."T 1" t1, lateral (select sum(t2.c1 + t1."C 1") sum from ft2 t2 group by t2.c1) qry where t1.c2 * 2 = qry.sum and t1.c2 < 10 order by 1;
reset enable_hashagg;
-- Check with placeHolderVars
explain (verbose, costs off)
select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2;
select q.b, count(ft4.c1), sum(q.a) from ft4 left join (select min(13), avg(ft1.c1), sum(ft2.c1) from ft1 right join ft2 on (ft1.c1 = ft2.c1) where ft1.c1 = 12) q(a, b, c) on (ft4.c1 = q.b) where ft4.c1 between 10 and 15 group by q.b order by 1 nulls last, 2;
-- Not supported cases
-- Grouping sets
explain (verbose, costs off)
select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last;
select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls last;
explain (verbose, costs off)
select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last;
select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last;
explain (verbose, costs off)
select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last;
select c2, c6, sum(c1) from ft1 where c2 < 3 group by grouping sets(c2, c6) order by 1 nulls last, 2 nulls last;
explain (verbose, costs off)
select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last;
select c2, sum(c1), grouping(c2) from ft1 where c2 < 3 group by c2 order by 1 nulls last;
-- DISTINCT itself is not pushed down, whereas underneath aggregate is pushed
explain (verbose, costs off)
select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1;
select distinct sum(c1)/1000 s from ft2 where c2 < 6 group by c2 order by 1;
-- WindowAgg
explain (verbose, costs off)
select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1;
select c2, sum(c2), count(c2) over (partition by c2%2) from ft2 where c2 < 10 group by c2 order by 1;
explain (verbose, costs off)
select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1;
select c2, array_agg(c2) over (partition by c2%2 order by c2 desc) from ft1 where c2 < 10 group by c2 order by 1;
explain (verbose, costs off)
select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1;
select c2, array_agg(c2) over (partition by c2%2 order by c2 range between current row and unbounded following) from ft1 where c2 < 10 group by c2 order by 1;
-- ===================================================================
-- parameterized queries
-- ===================================================================
@ -624,6 +928,7 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE int;
SELECT * FROM ft1 WHERE c1 = 1; -- ERROR
SELECT ft1.c1, ft2.c2, ft1.c8 FROM ft1, ft2 WHERE ft1.c1 = ft2.c1 AND ft1.c1 = 1; -- ERROR
SELECT ft1.c1, ft2.c2, ft1 FROM ft1, ft2 WHERE ft1.c1 = ft2.c1 AND ft1.c1 = 1; -- ERROR
SELECT sum(c2), array_agg(c8) FROM ft1 GROUP BY c8; -- ERROR
ALTER FOREIGN TABLE ft1 ALTER COLUMN c8 TYPE user_enum;
-- ===================================================================

View File

@ -3243,8 +3243,15 @@ create_foreignscan_plan(PlannerInfo *root, ForeignPath *best_path,
/* Copy foreign server OID; likewise, no need to make FDW do this */
scan_plan->fs_server = rel->serverid;
/* Likewise, copy the relids that are represented by this foreign scan */
scan_plan->fs_relids = best_path->path.parent->relids;
/*
* Likewise, copy the relids that are represented by this foreign scan. An
* upper rel doesn't have relids set, but it covers all the base relations
* participating in the underlying scan, so use root's all_baserels.
*/
if (rel->reloptkind == RELOPT_UPPER_REL)
scan_plan->fs_relids = root->all_baserels;
else
scan_plan->fs_relids = best_path->path.parent->relids;
/*
* If this is a foreign join, and to make it valid to push down we had to