postgres_fdw: Push down UPDATE/DELETE joins to remote servers.

Commit 0bf3ae88af allowed direct
foreign table modification; instead of fetching each row, updating
it locally, and then pushing the modification back to the remote
side, we would instead do all the work on the remote server via a
single remote UPDATE or DELETE command.  However, that commit only
enabled this optimization when join tree consisted only of the
target table.

This change allows the same optimization when an UPDATE statement
has a FROM clause or a DELETE statement has a USING clause.  This
works much like ordinary foreign join pushdown, in that the tables
must be on the same remote server, relevant parts of the query
must be pushdown-safe, and so forth.

Etsuro Fujita, reviewed by Ashutosh Bapat, Rushabh Lathia, and me.
Some formatting corrections by me.

Discussion: http://postgr.es/m/5A57193A.2080003@lab.ntt.co.jp
Discussion: http://postgr.es/m/b9cee735-62f8-6c07-7528-6364ce9347d0@lab.ntt.co.jp
This commit is contained in:
Robert Haas 2018-02-07 15:34:30 -05:00
parent 7c44b75a2a
commit 1bc0100d27
5 changed files with 863 additions and 92 deletions

View File

@ -132,7 +132,9 @@ static void deparseTargetList(StringInfo buf,
Bitmapset *attrs_used,
bool qualify_col,
List **retrieved_attrs);
static void deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
static void deparseExplicitTargetList(List *tlist,
bool is_returning,
List **retrieved_attrs,
deparse_expr_cxt *context);
static void deparseSubqueryTargetList(deparse_expr_cxt *context);
static void deparseReturningList(StringInfo buf, PlannerInfo *root,
@ -168,11 +170,13 @@ static void deparseLockingClause(deparse_expr_cxt *context);
static void appendOrderByClause(List *pathkeys, deparse_expr_cxt *context);
static void appendConditions(List *exprs, deparse_expr_cxt *context);
static void deparseFromExprForRel(StringInfo buf, PlannerInfo *root,
RelOptInfo *joinrel, bool use_alias, List **params_list);
RelOptInfo *foreignrel, bool use_alias,
Index ignore_rel, List **ignore_conds,
List **params_list);
static void deparseFromExpr(List *quals, deparse_expr_cxt *context);
static void deparseRangeTblRef(StringInfo buf, PlannerInfo *root,
RelOptInfo *foreignrel, bool make_subquery,
List **params_list);
Index ignore_rel, List **ignore_conds, List **params_list);
static void deparseAggref(Aggref *node, deparse_expr_cxt *context);
static void appendGroupByClause(List *tlist, deparse_expr_cxt *context);
static void appendAggOrderBy(List *orderList, List *targetList,
@ -1028,7 +1032,7 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
* For a join or upper relation the input tlist gives the list of
* columns required to be fetched from the foreign server.
*/
deparseExplicitTargetList(tlist, retrieved_attrs, context);
deparseExplicitTargetList(tlist, false, retrieved_attrs, context);
}
else
{
@ -1071,7 +1075,7 @@ deparseFromExpr(List *quals, deparse_expr_cxt *context)
appendStringInfoString(buf, " FROM ");
deparseFromExprForRel(buf, context->root, scanrel,
(bms_num_members(scanrel->relids) > 1),
context->params_list);
(Index) 0, NULL, context->params_list);
/* Construct WHERE clause */
if (quals != NIL)
@ -1340,9 +1344,14 @@ get_jointype_name(JoinType jointype)
*
* retrieved_attrs is the list of continuously increasing integers starting
* from 1. It has same number of entries as tlist.
*
* This is used for both SELECT and RETURNING targetlists; the is_returning
* parameter is true only for a RETURNING targetlist.
*/
static void
deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
deparseExplicitTargetList(List *tlist,
bool is_returning,
List **retrieved_attrs,
deparse_expr_cxt *context)
{
ListCell *lc;
@ -1357,13 +1366,16 @@ deparseExplicitTargetList(List *tlist, List **retrieved_attrs,
if (i > 0)
appendStringInfoString(buf, ", ");
else if (is_returning)
appendStringInfoString(buf, " RETURNING ");
deparseExpr((Expr *) tle->expr, context);
*retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
i++;
}
if (i == 0)
if (i == 0 && !is_returning)
appendStringInfoString(buf, "NULL");
}
@ -1406,10 +1418,17 @@ deparseSubqueryTargetList(deparse_expr_cxt *context)
* The function constructs ... JOIN ... ON ... for join relation. For a base
* relation it just returns schema-qualified tablename, with the appropriate
* alias if so requested.
*
* 'ignore_rel' is either zero or the RT index of a target relation. In the
* latter case the function constructs FROM clause of UPDATE or USING clause
* of DELETE; it deparses the join relation as if the relation never contained
* the target relation, and creates a List of conditions to be deparsed into
* the top-level WHERE clause, which is returned to *ignore_conds.
*/
static void
deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
bool use_alias, List **params_list)
bool use_alias, Index ignore_rel, List **ignore_conds,
List **params_list)
{
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
@ -1417,16 +1436,89 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
{
StringInfoData join_sql_o;
StringInfoData join_sql_i;
RelOptInfo *outerrel = fpinfo->outerrel;
RelOptInfo *innerrel = fpinfo->innerrel;
bool outerrel_is_target = false;
bool innerrel_is_target = false;
/* Deparse outer relation */
initStringInfo(&join_sql_o);
deparseRangeTblRef(&join_sql_o, root, fpinfo->outerrel,
fpinfo->make_outerrel_subquery, params_list);
if (ignore_rel > 0 && bms_is_member(ignore_rel, foreignrel->relids))
{
/*
* If this is an inner join, add joinclauses to *ignore_conds and
* set it to empty so that those can be deparsed into the WHERE
* clause. Note that since the target relation can never be
* within the nullable side of an outer join, those could safely
* be pulled up into the WHERE clause (see foreign_join_ok()).
* Note also that since the target relation is only inner-joined
* to any other relation in the query, all conditions in the join
* tree mentioning the target relation could be deparsed into the
* WHERE clause by doing this recursively.
*/
if (fpinfo->jointype == JOIN_INNER)
{
*ignore_conds = list_concat(*ignore_conds,
list_copy(fpinfo->joinclauses));
fpinfo->joinclauses = NIL;
}
/* Deparse inner relation */
initStringInfo(&join_sql_i);
deparseRangeTblRef(&join_sql_i, root, fpinfo->innerrel,
fpinfo->make_innerrel_subquery, params_list);
/*
* Check if either of the input relations is the target relation.
*/
if (outerrel->relid == ignore_rel)
outerrel_is_target = true;
else if (innerrel->relid == ignore_rel)
innerrel_is_target = true;
}
/* Deparse outer relation if not the target relation. */
if (!outerrel_is_target)
{
initStringInfo(&join_sql_o);
deparseRangeTblRef(&join_sql_o, root, outerrel,
fpinfo->make_outerrel_subquery,
ignore_rel, ignore_conds, params_list);
/*
* If inner relation is the target relation, skip deparsing it.
* Note that since the join of the target relation with any other
* relation in the query is an inner join and can never be within
* the nullable side of an outer join, the join could be
* interchanged with higher-level joins (cf. identity 1 on outer
* join reordering shown in src/backend/optimizer/README), which
* means it's safe to skip the target-relation deparsing here.
*/
if (innerrel_is_target)
{
Assert(fpinfo->jointype == JOIN_INNER);
Assert(fpinfo->joinclauses == NIL);
appendStringInfo(buf, "%s", join_sql_o.data);
return;
}
}
/* Deparse inner relation if not the target relation. */
if (!innerrel_is_target)
{
initStringInfo(&join_sql_i);
deparseRangeTblRef(&join_sql_i, root, innerrel,
fpinfo->make_innerrel_subquery,
ignore_rel, ignore_conds, params_list);
/*
* If outer relation is the target relation, skip deparsing it.
* See the above note about safety.
*/
if (outerrel_is_target)
{
Assert(fpinfo->jointype == JOIN_INNER);
Assert(fpinfo->joinclauses == NIL);
appendStringInfo(buf, "%s", join_sql_i.data);
return;
}
}
/* Neither of the relations is the target relation. */
Assert(!outerrel_is_target && !innerrel_is_target);
/*
* For a join relation FROM clause entry is deparsed as
@ -1486,7 +1578,8 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
*/
static void
deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
bool make_subquery, List **params_list)
bool make_subquery, Index ignore_rel, List **ignore_conds,
List **params_list)
{
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
@ -1501,6 +1594,14 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
List *retrieved_attrs;
int ncols;
/*
* The given relation shouldn't contain the target relation, because
* this should only happen for input relations for a full join, and
* such relations can never contain an UPDATE/DELETE target.
*/
Assert(ignore_rel == 0 ||
!bms_is_member(ignore_rel, foreignrel->relids));
/* Deparse the subquery representing the relation. */
appendStringInfoChar(buf, '(');
deparseSelectStmtForRel(buf, root, foreignrel, NIL,
@ -1534,7 +1635,8 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel,
}
}
else
deparseFromExprForRel(buf, root, foreignrel, true, params_list);
deparseFromExprForRel(buf, root, foreignrel, true, ignore_rel,
ignore_conds, params_list);
}
/*
@ -1645,13 +1747,23 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
/*
* deparse remote UPDATE statement
*
* The statement text is appended to buf, and we also create an integer List
* of the columns being retrieved by RETURNING (if any), which is returned
* to *retrieved_attrs.
* 'buf' is the output buffer to append the statement to
* 'rtindex' is the RT index of the associated target relation
* 'rel' is the relation descriptor for the target relation
* 'foreignrel' is the RelOptInfo for the target relation or the join relation
* containing all base relations in the query
* 'targetlist' is the tlist of the underlying foreign-scan plan node
* 'targetAttrs' is the target columns of the UPDATE
* 'remote_conds' is the qual clauses that must be evaluated remotely
* '*params_list' is an output list of exprs that will become remote Params
* 'returningList' is the RETURNING targetlist
* '*retrieved_attrs' is an output list of integers of columns being retrieved
* by RETURNING (if any)
*/
void
deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *targetlist,
List *targetAttrs,
List *remote_conds,
@ -1659,7 +1771,6 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
List *returningList,
List **retrieved_attrs)
{
RelOptInfo *baserel = root->simple_rel_array[rtindex];
deparse_expr_cxt context;
int nestlevel;
bool first;
@ -1667,13 +1778,15 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
/* Set up context struct for recursion */
context.root = root;
context.foreignrel = baserel;
context.scanrel = baserel;
context.foreignrel = foreignrel;
context.scanrel = foreignrel;
context.buf = buf;
context.params_list = params_list;
appendStringInfoString(buf, "UPDATE ");
deparseRelation(buf, rel);
if (foreignrel->reloptkind == RELOPT_JOINREL)
appendStringInfo(buf, " %s%d", REL_ALIAS_PREFIX, rtindex);
appendStringInfoString(buf, " SET ");
/* Make sure any constants in the exprs are printed portably */
@ -1700,14 +1813,28 @@ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
reset_transmission_modes(nestlevel);
if (foreignrel->reloptkind == RELOPT_JOINREL)
{
List *ignore_conds = NIL;
appendStringInfo(buf, " FROM ");
deparseFromExprForRel(buf, root, foreignrel, true, rtindex,
&ignore_conds, params_list);
remote_conds = list_concat(remote_conds, ignore_conds);
}
if (remote_conds)
{
appendStringInfoString(buf, " WHERE ");
appendConditions(remote_conds, &context);
}
deparseReturningList(buf, root, rtindex, rel, false,
returningList, retrieved_attrs);
if (foreignrel->reloptkind == RELOPT_JOINREL)
deparseExplicitTargetList(returningList, true, retrieved_attrs,
&context);
else
deparseReturningList(buf, root, rtindex, rel, false,
returningList, retrieved_attrs);
}
/*
@ -1735,30 +1862,49 @@ deparseDeleteSql(StringInfo buf, PlannerInfo *root,
/*
* deparse remote DELETE statement
*
* The statement text is appended to buf, and we also create an integer List
* of the columns being retrieved by RETURNING (if any), which is returned
* to *retrieved_attrs.
* 'buf' is the output buffer to append the statement to
* 'rtindex' is the RT index of the associated target relation
* 'rel' is the relation descriptor for the target relation
* 'foreignrel' is the RelOptInfo for the target relation or the join relation
* containing all base relations in the query
* 'remote_conds' is the qual clauses that must be evaluated remotely
* '*params_list' is an output list of exprs that will become remote Params
* 'returningList' is the RETURNING targetlist
* '*retrieved_attrs' is an output list of integers of columns being retrieved
* by RETURNING (if any)
*/
void
deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *remote_conds,
List **params_list,
List *returningList,
List **retrieved_attrs)
{
RelOptInfo *baserel = root->simple_rel_array[rtindex];
deparse_expr_cxt context;
/* Set up context struct for recursion */
context.root = root;
context.foreignrel = baserel;
context.scanrel = baserel;
context.foreignrel = foreignrel;
context.scanrel = foreignrel;
context.buf = buf;
context.params_list = params_list;
appendStringInfoString(buf, "DELETE FROM ");
deparseRelation(buf, rel);
if (foreignrel->reloptkind == RELOPT_JOINREL)
appendStringInfo(buf, " %s%d", REL_ALIAS_PREFIX, rtindex);
if (foreignrel->reloptkind == RELOPT_JOINREL)
{
List *ignore_conds = NIL;
appendStringInfo(buf, " USING ");
deparseFromExprForRel(buf, root, foreignrel, true, rtindex,
&ignore_conds, params_list);
remote_conds = list_concat(remote_conds, ignore_conds);
}
if (remote_conds)
{
@ -1766,8 +1912,12 @@ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
appendConditions(remote_conds, &context);
}
deparseReturningList(buf, root, rtindex, rel, false,
returningList, retrieved_attrs);
if (foreignrel->reloptkind == RELOPT_JOINREL)
deparseExplicitTargetList(returningList, true, retrieved_attrs,
&context);
else
deparseReturningList(buf, root, rtindex, rel, false,
returningList, retrieved_attrs);
}
/*

View File

@ -4399,27 +4399,13 @@ UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can't be pushed down
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can be pushed down
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Update on public.ft2
Remote SQL: UPDATE "S 1"."T 1" SET c2 = $2, c3 = $3, c7 = $4 WHERE ctid = $1
-> Foreign Scan
Output: ft2.c1, (ft2.c2 + 500), NULL::integer, (ft2.c3 || '_update9'::text), ft2.c4, ft2.c5, ft2.c6, 'ft2 '::character(10), ft2.c8, ft2.ctid, ft1.*
Relations: (public.ft2) INNER JOIN (public.ft1)
Remote SQL: SELECT r1."C 1", r1.c2, r1.c3, r1.c4, r1.c5, r1.c6, r1.c8, r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2."C 1", r2.c2, r2.c3, r2.c4, r2.c5, r2.c6, r2.c7, r2.c8) END FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 9)))) FOR UPDATE OF r1
-> Hash Join
Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c8, ft2.ctid, ft1.*
Hash Cond: (ft2.c2 = ft1.c1)
-> Foreign Scan on public.ft2
Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c8, ft2.ctid
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c8, ctid FROM "S 1"."T 1" FOR UPDATE
-> Hash
Output: ft1.*, ft1.c1
-> Foreign Scan on public.ft1
Output: ft1.*, ft1.c1
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 9))
(17 rows)
-> Foreign Update
Remote SQL: UPDATE "S 1"."T 1" r1 SET c2 = (r1.c2 + 500), c3 = (r1.c3 || '_update9'::text), c7 = 'ft2 '::character(10) FROM "S 1"."T 1" r2 WHERE ((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 9))
(3 rows)
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
@ -4542,27 +4528,13 @@ DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
(103 rows)
EXPLAIN (verbose, costs off)
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can't be pushed down
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can be pushed down
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
Delete on public.ft2
Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1
-> Foreign Scan
Output: ft2.ctid, ft1.*
Relations: (public.ft2) INNER JOIN (public.ft1)
Remote SQL: SELECT r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2."C 1", r2.c2, r2.c3, r2.c4, r2.c5, r2.c6, r2.c7, r2.c8) END FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 2)))) FOR UPDATE OF r1
-> Hash Join
Output: ft2.ctid, ft1.*
Hash Cond: (ft2.c2 = ft1.c1)
-> Foreign Scan on public.ft2
Output: ft2.ctid, ft2.c2
Remote SQL: SELECT c2, ctid FROM "S 1"."T 1" FOR UPDATE
-> Hash
Output: ft1.*, ft1.c1
-> Foreign Scan on public.ft1
Output: ft1.*, ft1.c1
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 2))
(17 rows)
-> Foreign Delete
Remote SQL: DELETE FROM "S 1"."T 1" r1 USING "S 1"."T 1" r2 WHERE ((r1.c2 = r2."C 1")) AND (((r2."C 1" % 10) = 2))
(3 rows)
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1;
@ -5438,6 +5410,195 @@ DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass;
ft2
(1 row)
-- Test UPDATE/DELETE with RETURNING on a three-table join
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id - 1200, to_char(id, 'FM00000') FROM generate_series(1201, 1300) id;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'foo'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*; -- can be pushed down
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Update on public.ft2
Output: ft2.ctid, ft2.*, ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft4.ctid, ft4.*, ft4.c1, ft4.c2, ft4.c3
-> Foreign Update
Remote SQL: UPDATE "S 1"."T 1" r1 SET c3 = 'foo'::text FROM ("S 1"."T 3" r2 INNER JOIN "S 1"."T 4" r3 ON (TRUE)) WHERE ((r2.c1 = r3.c1)) AND ((r1.c2 = r2.c1)) AND ((r1."C 1" > 1200)) RETURNING r1."C 1", r1.c2, r1.c3, r1.c4, r1.c5, r1.c6, r1.c7, r1.c8, r1.ctid, r2.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, r2.c1, r2.c2, r2.c3
(4 rows)
UPDATE ft2 SET c3 = 'foo'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;
ctid | ft2 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | ctid | ft4 | c1 | c2 | c3
----------+--------------------------------+------+----+-----+----+----+----+------------+----+--------+----------------+----+----+--------
(12,102) | (1206,6,foo,,,,"ft2 ",) | 1206 | 6 | foo | | | | ft2 | | (0,6) | (6,7,AAA006) | 6 | 7 | AAA006
(12,103) | (1212,12,foo,,,,"ft2 ",) | 1212 | 12 | foo | | | | ft2 | | (0,12) | (12,13,AAA012) | 12 | 13 | AAA012
(12,104) | (1218,18,foo,,,,"ft2 ",) | 1218 | 18 | foo | | | | ft2 | | (0,18) | (18,19,AAA018) | 18 | 19 | AAA018
(12,105) | (1224,24,foo,,,,"ft2 ",) | 1224 | 24 | foo | | | | ft2 | | (0,24) | (24,25,AAA024) | 24 | 25 | AAA024
(12,106) | (1230,30,foo,,,,"ft2 ",) | 1230 | 30 | foo | | | | ft2 | | (0,30) | (30,31,AAA030) | 30 | 31 | AAA030
(12,107) | (1236,36,foo,,,,"ft2 ",) | 1236 | 36 | foo | | | | ft2 | | (0,36) | (36,37,AAA036) | 36 | 37 | AAA036
(12,108) | (1242,42,foo,,,,"ft2 ",) | 1242 | 42 | foo | | | | ft2 | | (0,42) | (42,43,AAA042) | 42 | 43 | AAA042
(12,109) | (1248,48,foo,,,,"ft2 ",) | 1248 | 48 | foo | | | | ft2 | | (0,48) | (48,49,AAA048) | 48 | 49 | AAA048
(12,110) | (1254,54,foo,,,,"ft2 ",) | 1254 | 54 | foo | | | | ft2 | | (0,54) | (54,55,AAA054) | 54 | 55 | AAA054
(12,111) | (1260,60,foo,,,,"ft2 ",) | 1260 | 60 | foo | | | | ft2 | | (0,60) | (60,61,AAA060) | 60 | 61 | AAA060
(12,112) | (1266,66,foo,,,,"ft2 ",) | 1266 | 66 | foo | | | | ft2 | | (0,66) | (66,67,AAA066) | 66 | 67 | AAA066
(12,113) | (1272,72,foo,,,,"ft2 ",) | 1272 | 72 | foo | | | | ft2 | | (0,72) | (72,73,AAA072) | 72 | 73 | AAA072
(12,114) | (1278,78,foo,,,,"ft2 ",) | 1278 | 78 | foo | | | | ft2 | | (0,78) | (78,79,AAA078) | 78 | 79 | AAA078
(12,115) | (1284,84,foo,,,,"ft2 ",) | 1284 | 84 | foo | | | | ft2 | | (0,84) | (84,85,AAA084) | 84 | 85 | AAA084
(12,116) | (1290,90,foo,,,,"ft2 ",) | 1290 | 90 | foo | | | | ft2 | | (0,90) | (90,91,AAA090) | 90 | 91 | AAA090
(12,117) | (1296,96,foo,,,,"ft2 ",) | 1296 | 96 | foo | | | | ft2 | | (0,96) | (96,97,AAA096) | 96 | 97 | AAA096
(16 rows)
EXPLAIN (verbose, costs off)
DELETE FROM ft2
USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
RETURNING 100; -- can be pushed down
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.ft2
Output: 100
-> Foreign Delete
Remote SQL: DELETE FROM "S 1"."T 1" r1 USING ("S 1"."T 3" r2 LEFT JOIN "S 1"."T 4" r3 ON (((r2.c1 = r3.c1)))) WHERE ((r1.c2 = r2.c1)) AND ((r1."C 1" > 1200)) AND (((r1."C 1" % 10) = 0))
(4 rows)
DELETE FROM ft2
USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
RETURNING 100;
?column?
----------
100
100
100
100
100
100
100
100
100
100
(10 rows)
DELETE FROM ft2 WHERE ft2.c1 > 1200;
-- Test UPDATE/DELETE with WHERE or JOIN/ON conditions containing
-- user-defined operators/functions
ALTER SERVER loopback OPTIONS (DROP extensions);
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *; -- can't be pushed down
QUERY PLAN
----------------------------------------------------------------------------------------------------------
Update on public.ft2
Output: c1, c2, c3, c4, c5, c6, c7, c8
Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
-> Foreign Scan on public.ft2
Output: c1, c2, NULL::integer, 'bar'::text, c4, c5, c6, c7, c8, ctid
Filter: (postgres_fdw_abs(ft2.c1) > 2000)
Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" FOR UPDATE
(7 rows)
UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
------+----+-----+----+----+----+------------+----
2001 | 1 | bar | | | | ft2 |
2002 | 2 | bar | | | | ft2 |
2003 | 3 | bar | | | | ft2 |
2004 | 4 | bar | | | | ft2 |
2005 | 5 | bar | | | | ft2 |
2006 | 6 | bar | | | | ft2 |
2007 | 7 | bar | | | | ft2 |
2008 | 8 | bar | | | | ft2 |
2009 | 9 | bar | | | | ft2 |
2010 | 0 | bar | | | | ft2 |
(10 rows)
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'baz'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
RETURNING ft2.*, ft4.*, ft5.*; -- can't be pushed down
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Update on public.ft2
Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft4.c1, ft4.c2, ft4.c3, ft5.c1, ft5.c2, ft5.c3
Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
-> Nested Loop
Output: ft2.c1, ft2.c2, NULL::integer, 'baz'::text, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft2.ctid, ft4.*, ft5.*, ft4.c1, ft4.c2, ft4.c3, ft5.c1, ft5.c2, ft5.c3
Join Filter: (ft2.c2 === ft4.c1)
-> Foreign Scan on public.ft2
Output: ft2.c1, ft2.c2, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft2.ctid
Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" WHERE (("C 1" > 2000)) FOR UPDATE
-> Foreign Scan
Output: ft4.*, ft4.c1, ft4.c2, ft4.c3, ft5.*, ft5.c1, ft5.c2, ft5.c3
Relations: (public.ft4) INNER JOIN (public.ft5)
Remote SQL: SELECT CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, r2.c1, r2.c2, r2.c3, CASE WHEN (r3.*)::text IS NOT NULL THEN ROW(r3.c1, r3.c2, r3.c3) END, r3.c1, r3.c2, r3.c3 FROM ("S 1"."T 3" r2 INNER JOIN "S 1"."T 4" r3 ON (((r2.c1 = r3.c1))))
-> Hash Join
Output: ft4.*, ft4.c1, ft4.c2, ft4.c3, ft5.*, ft5.c1, ft5.c2, ft5.c3
Hash Cond: (ft4.c1 = ft5.c1)
-> Foreign Scan on public.ft4
Output: ft4.*, ft4.c1, ft4.c2, ft4.c3
Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 3"
-> Hash
Output: ft5.*, ft5.c1, ft5.c2, ft5.c3
-> Foreign Scan on public.ft5
Output: ft5.*, ft5.c1, ft5.c2, ft5.c3
Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 4"
(24 rows)
UPDATE ft2 SET c3 = 'baz'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
RETURNING ft2.*, ft4.*, ft5.*;
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c1 | c2 | c3
------+----+-----+----+----+----+------------+----+----+----+--------+----+----+--------
2006 | 6 | baz | | | | ft2 | | 6 | 7 | AAA006 | 6 | 7 | AAA006
(1 row)
EXPLAIN (verbose, costs off)
DELETE FROM ft2
USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3; -- can't be pushed down
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.ft2
Output: ft2.ctid, ft2.c1, ft2.c2, ft2.c3
Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c2, c3, ctid
-> Foreign Scan
Output: ft2.ctid, ft4.*, ft5.*
Filter: (ft4.c1 === ft5.c1)
Relations: ((public.ft2) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)
Remote SQL: SELECT r1.ctid, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.c1, r2.c2, r2.c3) END, CASE WHEN (r3.*)::text IS NOT NULL THEN ROW(r3.c1, r3.c2, r3.c3) END, r2.c1, r3.c1 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 3" r2 ON (((r1.c2 = r2.c1)) AND ((r1."C 1" > 2000)))) INNER JOIN "S 1"."T 4" r3 ON (TRUE)) FOR UPDATE OF r1
-> Nested Loop
Output: ft2.ctid, ft4.*, ft5.*, ft4.c1, ft5.c1
-> Nested Loop
Output: ft2.ctid, ft4.*, ft4.c1
Join Filter: (ft2.c2 = ft4.c1)
-> Foreign Scan on public.ft2
Output: ft2.ctid, ft2.c2
Remote SQL: SELECT c2, ctid FROM "S 1"."T 1" WHERE (("C 1" > 2000)) FOR UPDATE
-> Foreign Scan on public.ft4
Output: ft4.*, ft4.c1
Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 3"
-> Foreign Scan on public.ft5
Output: ft5.*, ft5.c1
Remote SQL: SELECT c1, c2, c3 FROM "S 1"."T 4"
(22 rows)
DELETE FROM ft2
USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;
ctid | c1 | c2 | c3
----------+------+----+-----
(12,112) | 2006 | 6 | baz
(1 row)
DELETE FROM ft2 WHERE ft2.c1 > 2000;
ALTER SERVER loopback OPTIONS (ADD extensions 'postgres_fdw');
-- Test that trigger on remote table works as expected
CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$
BEGIN

View File

@ -210,6 +210,11 @@ typedef struct PgFdwDirectModifyState
PGresult *result; /* result for query */
int num_tuples; /* # of result tuples */
int next_tuple; /* index of next one to return */
Relation resultRel; /* relcache entry for the target relation */
AttrNumber *attnoMap; /* array of attnums of input user columns */
AttrNumber ctidAttno; /* attnum of input ctid column */
AttrNumber oidAttno; /* attnum of input oid column */
bool hasSystemCols; /* are there system columns of resultRel? */
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@ -376,8 +381,17 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
static void execute_dml_stmt(ForeignScanState *node);
static TupleTableSlot *get_returning_data(ForeignScanState *node);
static void init_returning_filter(PgFdwDirectModifyState *dmstate,
List *fdw_scan_tlist,
Index rtindex);
static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
TupleTableSlot *slot,
EState *estate);
static void prepare_query_params(PlanState *node,
List *fdw_exprs,
int numParams,
@ -2144,14 +2158,15 @@ postgresPlanDirectModify(PlannerInfo *root,
if (subplan->qual != NIL)
return false;
/*
* We can't handle an UPDATE or DELETE on a foreign join for now.
*/
if (fscan->scan.scanrelid == 0)
return false;
/* Safe to fetch data about the target foreign rel */
foreignrel = root->simple_rel_array[resultRelation];
if (fscan->scan.scanrelid == 0)
{
foreignrel = find_join_rel(root, fscan->fs_relids);
/* We should have a rel for this foreign join. */
Assert(foreignrel);
}
else
foreignrel = root->simple_rel_array[resultRelation];
rte = root->simple_rte_array[resultRelation];
fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
@ -2212,8 +2227,23 @@ postgresPlanDirectModify(PlannerInfo *root,
* Extract the relevant RETURNING list if any.
*/
if (plan->returningLists)
{
returningList = (List *) list_nth(plan->returningLists, subplan_index);
/*
* When performing an UPDATE/DELETE .. RETURNING on a join directly,
* we fetch from the foreign server any Vars specified in RETURNING
* that refer not only to the target relation but to non-target
* relations. So we'll deparse them into the RETURNING clause of the
* remote query; use a targetlist consisting of them instead, which
* will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
* node below.
*/
if (fscan->scan.scanrelid == 0)
returningList = build_remote_returning(resultRelation, rel,
returningList);
}
/*
* Construct the SQL command string.
*/
@ -2221,6 +2251,7 @@ postgresPlanDirectModify(PlannerInfo *root,
{
case CMD_UPDATE:
deparseDirectUpdateSql(&sql, root, resultRelation, rel,
foreignrel,
((Plan *) fscan)->targetlist,
targetAttrs,
remote_exprs, &params_list,
@ -2228,6 +2259,7 @@ postgresPlanDirectModify(PlannerInfo *root,
break;
case CMD_DELETE:
deparseDirectDeleteSql(&sql, root, resultRelation, rel,
foreignrel,
remote_exprs, &params_list,
returningList, &retrieved_attrs);
break;
@ -2255,6 +2287,19 @@ postgresPlanDirectModify(PlannerInfo *root,
retrieved_attrs,
makeInteger(plan->canSetTag));
/*
* Update the foreign-join-related fields.
*/
if (fscan->scan.scanrelid == 0)
{
/* No need for the outer subplan. */
fscan->scan.plan.lefttree = NULL;
/* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
if (returningList)
rebuild_fdw_scan_tlist(fscan, returningList);
}
heap_close(rel, NoLock);
return true;
}
@ -2269,6 +2314,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
PgFdwDirectModifyState *dmstate;
Index rtindex;
RangeTblEntry *rte;
Oid userid;
ForeignTable *table;
@ -2291,11 +2337,15 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Identify which user to do the remote access as. This should match what
* ExecCheckRTEPerms() does.
*/
rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
rte = rt_fetch(rtindex, estate->es_range_table);
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/* Get info about foreign table. */
dmstate->rel = node->ss.ss_currentRelation;
if (fsplan->scan.scanrelid == 0)
dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
else
dmstate->rel = node->ss.ss_currentRelation;
table = GetForeignTable(RelationGetRelid(dmstate->rel));
user = GetUserMapping(userid, table->serverid);
@ -2305,6 +2355,21 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
*/
dmstate->conn = GetConnection(user, false);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
{
/* Save info about foreign table. */
dmstate->resultRel = dmstate->rel;
/*
* Set dmstate->rel to NULL to teach get_returning_data() and
* make_tuple_from_result_row() that columns fetched from the remote
* server are described by fdw_scan_tlist of the foreign-scan plan
* node, not the tuple descriptor for the target relation.
*/
dmstate->rel = NULL;
}
/* Initialize state variable */
dmstate->num_tuples = -1; /* -1 means not set yet */
@ -2325,7 +2390,24 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
/* Prepare for input conversion of RETURNING results. */
if (dmstate->has_returning)
dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
{
TupleDesc tupdesc;
if (fsplan->scan.scanrelid == 0)
tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
else
tupdesc = RelationGetDescr(dmstate->rel);
dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/*
* When performing an UPDATE/DELETE .. RETURNING on a join directly,
* initialize a filter to extract an updated/deleted tuple from a scan
* tuple.
*/
if (fsplan->scan.scanrelid == 0)
init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
}
/*
* Prepare for processing of parameters used in remote query, if any.
@ -2406,6 +2488,10 @@ postgresEndDirectModify(ForeignScanState *node)
ReleaseConnection(dmstate->conn);
dmstate->conn = NULL;
/* close the target relation. */
if (dmstate->resultRel)
ExecCloseScanRelation(dmstate->resultRel);
/* MemoryContext will be deleted automatically. */
}
@ -3272,6 +3358,136 @@ store_returning_result(PgFdwModifyState *fmstate,
PG_END_TRY();
}
/*
* build_remote_returning
* Build a RETURNING targetlist of a remote query for performing an
* UPDATE/DELETE .. RETURNING on a join directly
*/
static List *
build_remote_returning(Index rtindex, Relation rel, List *returningList)
{
bool have_wholerow = false;
List *tlist = NIL;
List *vars;
ListCell *lc;
Assert(returningList);
vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
/*
* If there's a whole-row reference to the target relation, then we'll
* need all the columns of the relation.
*/
foreach(lc, vars)
{
Var *var = (Var *) lfirst(lc);
if (IsA(var, Var) &&
var->varno == rtindex &&
var->varattno == InvalidAttrNumber)
{
have_wholerow = true;
break;
}
}
if (have_wholerow)
{
TupleDesc tupdesc = RelationGetDescr(rel);
int i;
for (i = 1; i <= tupdesc->natts; i++)
{
Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
Var *var;
/* Ignore dropped attributes. */
if (attr->attisdropped)
continue;
var = makeVar(rtindex,
i,
attr->atttypid,
attr->atttypmod,
attr->attcollation,
0);
tlist = lappend(tlist,
makeTargetEntry((Expr *) var,
list_length(tlist) + 1,
NULL,
false));
}
}
/* Now add any remaining columns to tlist. */
foreach(lc, vars)
{
Var *var = (Var *) lfirst(lc);
/*
* No need for whole-row references to the target relation. We don't
* need system columns other than ctid and oid either, since those are
* set locally.
*/
if (IsA(var, Var) &&
var->varno == rtindex &&
var->varattno <= InvalidAttrNumber &&
var->varattno != SelfItemPointerAttributeNumber &&
var->varattno != ObjectIdAttributeNumber)
continue; /* don't need it */
if (tlist_member((Expr *) var, tlist))
continue; /* already got it */
tlist = lappend(tlist,
makeTargetEntry((Expr *) var,
list_length(tlist) + 1,
NULL,
false));
}
list_free(vars);
return tlist;
}
/*
* rebuild_fdw_scan_tlist
* Build new fdw_scan_tlist of given foreign-scan plan node from given
* tlist
*
* There might be columns that the fdw_scan_tlist of the given foreign-scan
* plan node contains that the given tlist doesn't. The fdw_scan_tlist would
* have contained resjunk columns such as 'ctid' of the target relation and
* 'wholerow' of non-target relations, but the tlist might not contain them,
* for example. So, adjust the tlist so it contains all the columns specified
* in the fdw_scan_tlist; else setrefs.c will get confused.
*/
static void
rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
{
List *new_tlist = tlist;
List *old_tlist = fscan->fdw_scan_tlist;
ListCell *lc;
foreach(lc, old_tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (tlist_member(tle->expr, new_tlist))
continue; /* already got it */
new_tlist = lappend(new_tlist,
makeTargetEntry(tle->expr,
list_length(new_tlist) + 1,
NULL,
false));
}
fscan->fdw_scan_tlist = new_tlist;
}
/*
* Execute a direct UPDATE/DELETE statement.
*/
@ -3332,6 +3548,7 @@ get_returning_data(ForeignScanState *node)
EState *estate = node->ss.ps.state;
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
TupleTableSlot *resultSlot;
Assert(resultRelInfo->ri_projectReturning);
@ -3349,7 +3566,10 @@ get_returning_data(ForeignScanState *node)
* "UPDATE/DELETE .. RETURNING 1" for example.)
*/
if (!dmstate->has_returning)
{
ExecStoreAllNullTuple(slot);
resultSlot = slot;
}
else
{
/*
@ -3365,7 +3585,7 @@ get_returning_data(ForeignScanState *node)
dmstate->rel,
dmstate->attinmeta,
dmstate->retrieved_attrs,
NULL,
node,
dmstate->temp_cxt);
ExecStoreTuple(newtup, slot, InvalidBuffer, false);
}
@ -3376,15 +3596,204 @@ get_returning_data(ForeignScanState *node)
PG_RE_THROW();
}
PG_END_TRY();
/* Get the updated/deleted tuple. */
if (dmstate->rel)
resultSlot = slot;
else
resultSlot = apply_returning_filter(dmstate, slot, estate);
}
dmstate->next_tuple++;
/* Make slot available for evaluation of the local query RETURNING list. */
resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
resultSlot;
return slot;
}
/*
* Initialize a filter to extract an updated/deleted tuple from a scan tuple.
*/
static void
init_returning_filter(PgFdwDirectModifyState *dmstate,
List *fdw_scan_tlist,
Index rtindex)
{
TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
ListCell *lc;
int i;
/*
* Calculate the mapping between the fdw_scan_tlist's entries and the
* result tuple's attributes.
*
* The "map" is an array of indexes of the result tuple's attributes in
* fdw_scan_tlist, i.e., one entry for every attribute of the result
* tuple. We store zero for any attributes that don't have the
* corresponding entries in that list, marking that a NULL is needed in
* the result tuple.
*
* Also get the indexes of the entries for ctid and oid if any.
*/
dmstate->attnoMap = (AttrNumber *)
palloc0(resultTupType->natts * sizeof(AttrNumber));
dmstate->ctidAttno = dmstate->oidAttno = 0;
i = 1;
dmstate->hasSystemCols = false;
foreach(lc, fdw_scan_tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Var *var = (Var *) tle->expr;
Assert(IsA(var, Var));
/*
* If the Var is a column of the target relation to be retrieved from
* the foreign server, get the index of the entry.
*/
if (var->varno == rtindex &&
list_member_int(dmstate->retrieved_attrs, i))
{
int attrno = var->varattno;
if (attrno < 0)
{
/*
* We don't retrieve system columns other than ctid and oid.
*/
if (attrno == SelfItemPointerAttributeNumber)
dmstate->ctidAttno = i;
else if (attrno == ObjectIdAttributeNumber)
dmstate->oidAttno = i;
else
Assert(false);
dmstate->hasSystemCols = true;
}
else
{
/*
* We don't retrieve whole-row references to the target
* relation either.
*/
Assert(attrno > 0);
dmstate->attnoMap[attrno - 1] = i;
}
}
i++;
}
}
/*
* Extract and return an updated/deleted tuple from a scan tuple.
*/
static TupleTableSlot *
apply_returning_filter(PgFdwDirectModifyState *dmstate,
TupleTableSlot *slot,
EState *estate)
{
TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
TupleTableSlot *resultSlot;
Datum *values;
bool *isnull;
Datum *old_values;
bool *old_isnull;
int i;
/*
* Use the trigger tuple slot as a place to store the result tuple.
*/
resultSlot = estate->es_trig_tuple_slot;
if (resultSlot->tts_tupleDescriptor != resultTupType)
ExecSetSlotDescriptor(resultSlot, resultTupType);
/*
* Extract all the values of the scan tuple.
*/
slot_getallattrs(slot);
old_values = slot->tts_values;
old_isnull = slot->tts_isnull;
/*
* Prepare to build the result tuple.
*/
ExecClearTuple(resultSlot);
values = resultSlot->tts_values;
isnull = resultSlot->tts_isnull;
/*
* Transpose data into proper fields of the result tuple.
*/
for (i = 0; i < resultTupType->natts; i++)
{
int j = dmstate->attnoMap[i];
if (j == 0)
{
values[i] = (Datum) 0;
isnull[i] = true;
}
else
{
values[i] = old_values[j - 1];
isnull[i] = old_isnull[j - 1];
}
}
/*
* Build the virtual tuple.
*/
ExecStoreVirtualTuple(resultSlot);
/*
* If we have any system columns to return, install them.
*/
if (dmstate->hasSystemCols)
{
HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
/* ctid */
if (dmstate->ctidAttno)
{
ItemPointer ctid = NULL;
ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
resultTup->t_self = *ctid;
}
/* oid */
if (dmstate->oidAttno)
{
Oid oid = InvalidOid;
oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
HeapTupleSetOid(resultTup, oid);
}
/*
* And remaining columns
*
* Note: since we currently don't allow the target relation to appear
* on the nullable side of an outer join, any system columns wouldn't
* go to NULL.
*
* Note: no need to care about tableoid here because it will be
* initialized in ExecProcessReturning().
*/
HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
}
/*
* And return the result tuple.
*/
return resultSlot;
}
/*
* Prepare for processing of parameters used in remote query.
*/
@ -4954,11 +5363,8 @@ make_tuple_from_result_row(PGresult *res,
tupdesc = RelationGetDescr(rel);
else
{
PgFdwScanState *fdw_sstate;
Assert(fsstate);
fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
tupdesc = fdw_sstate->tupdesc;
tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
}
values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));

View File

@ -150,6 +150,7 @@ extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
List **retrieved_attrs);
extern void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *targetlist,
List *targetAttrs,
List *remote_conds,
@ -162,6 +163,7 @@ extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
List **retrieved_attrs);
extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *remote_conds,
List **params_list,
List *returningList,

View File

@ -1082,14 +1082,14 @@ UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING
UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can't be pushed down
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can be pushed down
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
EXPLAIN (verbose, costs off)
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; -- can be pushed down
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
EXPLAIN (verbose, costs off)
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can't be pushed down
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can be pushed down
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1;
EXPLAIN (verbose, costs off)
@ -1102,6 +1102,58 @@ EXPLAIN (verbose, costs off)
DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass; -- can be pushed down
DELETE FROM ft2 WHERE c1 = 9999 RETURNING tableoid::regclass;
-- Test UPDATE/DELETE with RETURNING on a three-table join
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id - 1200, to_char(id, 'FM00000') FROM generate_series(1201, 1300) id;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'foo'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*; -- can be pushed down
UPDATE ft2 SET c3 = 'foo'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2, ft2.*, ft4.ctid, ft4, ft4.*;
EXPLAIN (verbose, costs off)
DELETE FROM ft2
USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
RETURNING 100; -- can be pushed down
DELETE FROM ft2
USING ft4 LEFT JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 1200 AND ft2.c1 % 10 = 0 AND ft2.c2 = ft4.c1
RETURNING 100;
DELETE FROM ft2 WHERE ft2.c1 > 1200;
-- Test UPDATE/DELETE with WHERE or JOIN/ON conditions containing
-- user-defined operators/functions
ALTER SERVER loopback OPTIONS (DROP extensions);
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *; -- can't be pushed down
UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'baz'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
RETURNING ft2.*, ft4.*, ft5.*; -- can't be pushed down
UPDATE ft2 SET c3 = 'baz'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 === ft4.c1
RETURNING ft2.*, ft4.*, ft5.*;
EXPLAIN (verbose, costs off)
DELETE FROM ft2
USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3; -- can't be pushed down
DELETE FROM ft2
USING ft4 INNER JOIN ft5 ON (ft4.c1 === ft5.c1)
WHERE ft2.c1 > 2000 AND ft2.c2 = ft4.c1
RETURNING ft2.ctid, ft2.c1, ft2.c2, ft2.c3;
DELETE FROM ft2 WHERE ft2.c1 > 2000;
ALTER SERVER loopback OPTIONS (ADD extensions 'postgres_fdw');
-- Test that trigger on remote table works as expected
CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$
BEGIN