Allow insert and update tuple routing and COPY for foreign tables.

Also enable this for postgres_fdw.

Etsuro Fujita, based on an earlier patch by Amit Langote. The larger
patch series of which this is a part has been reviewed by Amit
Langote, David Fetter, Maksim Milyutin, Álvaro Herrera, Stephen Frost,
and me.  Minor documentation changes to the final version by me.

Discussion: http://postgr.es/m/29906a26-da12-8c86-4fb9-d8f88442f2b9@lab.ntt.co.jp
This commit is contained in:
Robert Haas 2018-04-06 19:16:11 -04:00
parent cb1ff1e5af
commit 3d956d9562
16 changed files with 924 additions and 91 deletions

View File

@ -136,6 +136,11 @@ DELETE FROM agg_csv WHERE a = 100;
-- but this should be allowed
SELECT * FROM agg_csv FOR UPDATE;
-- copy from isn't supported either
COPY agg_csv FROM STDIN;
12 3.4
\.
-- constraint exclusion tests
\t on
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;

View File

@ -221,6 +221,9 @@ SELECT * FROM agg_csv FOR UPDATE;
42 | 324.78
(3 rows)
-- copy from isn't supported either
COPY agg_csv FROM STDIN;
ERROR: cannot insert into foreign table "agg_csv"
-- constraint exclusion tests
\t on
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
@ -315,7 +318,7 @@ SELECT tableoid::regclass, * FROM p2;
(0 rows)
COPY pt FROM '@abs_srcdir@/data/list2.bad' with (format 'csv', delimiter ','); -- ERROR
ERROR: cannot route inserted tuples to a foreign table
ERROR: cannot insert into foreign table "p1"
CONTEXT: COPY pt, line 2: "1,qux"
COPY pt FROM '@abs_srcdir@/data/list2.csv' with (format 'csv', delimiter ',');
SELECT tableoid::regclass, * FROM pt;
@ -342,10 +345,10 @@ SELECT tableoid::regclass, * FROM p2;
(2 rows)
INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
ERROR: cannot route inserted tuples to a foreign table
ERROR: cannot insert into foreign table "p1"
INSERT INTO pt VALUES (2, 'xyzzy');
UPDATE pt set a = 1 where a = 2; -- ERROR
ERROR: cannot route inserted tuples to a foreign table
ERROR: cannot insert into foreign table "p1"
SELECT tableoid::regclass, * FROM pt;
tableoid | a | b
----------+---+-------

View File

@ -7371,6 +7371,340 @@ NOTICE: drop cascades to foreign table bar2
drop table loct1;
drop table loct2;
-- ===================================================================
-- test tuple routing for foreign-table partitions
-- ===================================================================
-- Test insert tuple routing
create table itrtest (a int, b text) partition by list (a);
create table loct1 (a int check (a in (1)), b text);
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
create table loct2 (a int check (a in (2)), b text);
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
alter table itrtest attach partition remp1 for values in (1);
alter table itrtest attach partition remp2 for values in (2);
insert into itrtest values (1, 'foo');
insert into itrtest values (1, 'bar') returning *;
a | b
---+-----
1 | bar
(1 row)
insert into itrtest values (2, 'baz');
insert into itrtest values (2, 'qux') returning *;
a | b
---+-----
2 | qux
(1 row)
insert into itrtest values (1, 'test1'), (2, 'test2') returning *;
a | b
---+-------
1 | test1
2 | test2
(2 rows)
select tableoid::regclass, * FROM itrtest;
tableoid | a | b
----------+---+-------
remp1 | 1 | foo
remp1 | 1 | bar
remp1 | 1 | test1
remp2 | 2 | baz
remp2 | 2 | qux
remp2 | 2 | test2
(6 rows)
select tableoid::regclass, * FROM remp1;
tableoid | a | b
----------+---+-------
remp1 | 1 | foo
remp1 | 1 | bar
remp1 | 1 | test1
(3 rows)
select tableoid::regclass, * FROM remp2;
tableoid | b | a
----------+-------+---
remp2 | baz | 2
remp2 | qux | 2
remp2 | test2 | 2
(3 rows)
delete from itrtest;
create unique index loct1_idx on loct1 (a);
-- DO NOTHING without an inference specification is supported
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
a | b
---+-----
1 | foo
(1 row)
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
a | b
---+---
(0 rows)
-- But other cases are not supported
insert into itrtest values (1, 'bar') on conflict (a) do nothing;
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b;
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
select tableoid::regclass, * FROM itrtest;
tableoid | a | b
----------+---+-----
remp1 | 1 | foo
(1 row)
drop table itrtest;
drop table loct1;
drop table loct2;
-- Test update tuple routing
create table utrtest (a int, b text) partition by list (a);
create table loct (a int check (a in (1)), b text);
create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct');
create table locp (a int check (a in (2)), b text);
alter table utrtest attach partition remp for values in (1);
alter table utrtest attach partition locp for values in (2);
insert into utrtest values (1, 'foo');
insert into utrtest values (2, 'qux');
select tableoid::regclass, * FROM utrtest;
tableoid | a | b
----------+---+-----
remp | 1 | foo
locp | 2 | qux
(2 rows)
select tableoid::regclass, * FROM remp;
tableoid | a | b
----------+---+-----
remp | 1 | foo
(1 row)
select tableoid::regclass, * FROM locp;
tableoid | a | b
----------+---+-----
locp | 2 | qux
(1 row)
-- It's not allowed to move a row from a partition that is foreign to another
update utrtest set a = 2 where b = 'foo' returning *;
ERROR: new row for relation "loct" violates check constraint "loct_a_check"
DETAIL: Failing row contains (2, foo).
CONTEXT: remote SQL command: UPDATE public.loct SET a = 2 WHERE ((b = 'foo'::text)) RETURNING a, b
-- But the reverse is allowed
update utrtest set a = 1 where b = 'qux' returning *;
a | b
---+-----
1 | qux
(1 row)
select tableoid::regclass, * FROM utrtest;
tableoid | a | b
----------+---+-----
remp | 1 | foo
remp | 1 | qux
(2 rows)
select tableoid::regclass, * FROM remp;
tableoid | a | b
----------+---+-----
remp | 1 | foo
remp | 1 | qux
(2 rows)
select tableoid::regclass, * FROM locp;
tableoid | a | b
----------+---+---
(0 rows)
-- The executor should not let unexercised FDWs shut down
update utrtest set a = 1 where b = 'foo';
drop table utrtest;
drop table loct;
-- Test copy tuple routing
create table ctrtest (a int, b text) partition by list (a);
create table loct1 (a int check (a in (1)), b text);
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
create table loct2 (a int check (a in (2)), b text);
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
alter table ctrtest attach partition remp1 for values in (1);
alter table ctrtest attach partition remp2 for values in (2);
copy ctrtest from stdin;
select tableoid::regclass, * FROM ctrtest;
tableoid | a | b
----------+---+-----
remp1 | 1 | foo
remp2 | 2 | qux
(2 rows)
select tableoid::regclass, * FROM remp1;
tableoid | a | b
----------+---+-----
remp1 | 1 | foo
(1 row)
select tableoid::regclass, * FROM remp2;
tableoid | b | a
----------+-----+---
remp2 | qux | 2
(1 row)
-- Copying into foreign partitions directly should work as well
copy remp1 from stdin;
select tableoid::regclass, * FROM remp1;
tableoid | a | b
----------+---+-----
remp1 | 1 | foo
remp1 | 1 | bar
(2 rows)
drop table ctrtest;
drop table loct1;
drop table loct2;
-- ===================================================================
-- test COPY FROM
-- ===================================================================
create table loc2 (f1 int, f2 text);
alter table loc2 set (autovacuum_enabled = 'false');
create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
-- Test basic functionality
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+-----
1 | foo
2 | bar
(2 rows)
delete from rem2;
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- check constraint is enforced on the remote side, not locally
copy rem2 from stdin;
copy rem2 from stdin; -- ERROR
ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
DETAIL: Failing row contains (-1, xyzzy).
CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
COPY rem2, line 1: "-1 xyzzy"
select * from rem2;
f1 | f2
----+-----
1 | foo
2 | bar
(2 rows)
alter foreign table rem2 drop constraint rem2_f1positive;
alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
-- Test local triggers
create trigger trig_stmt_before before insert on rem2
for each statement execute procedure trigger_func();
create trigger trig_stmt_after after insert on rem2
for each statement execute procedure trigger_func();
create trigger trig_row_before before insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger trig_row_after after insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
copy rem2 from stdin;
NOTICE: trigger_func(<NULL>) called: action = INSERT, when = BEFORE, level = STATEMENT
NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
NOTICE: NEW: (1,foo)
NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
NOTICE: NEW: (2,bar)
NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
NOTICE: NEW: (1,foo)
NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
NOTICE: NEW: (2,bar)
NOTICE: trigger_func(<NULL>) called: action = INSERT, when = AFTER, level = STATEMENT
select * from rem2;
f1 | f2
----+-----
1 | foo
2 | bar
(2 rows)
drop trigger trig_row_before on rem2;
drop trigger trig_row_after on rem2;
drop trigger trig_stmt_before on rem2;
drop trigger trig_stmt_after on rem2;
delete from rem2;
create trigger trig_row_before_insert before insert on rem2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+-----------------
1 | foo triggered !
2 | bar triggered !
(2 rows)
drop trigger trig_row_before_insert on rem2;
delete from rem2;
create trigger trig_null before insert on rem2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+----
(0 rows)
drop trigger trig_null on rem2;
delete from rem2;
-- Test remote triggers
create trigger trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+-----------------
1 | foo triggered !
2 | bar triggered !
(2 rows)
drop trigger trig_row_before_insert on loc2;
delete from rem2;
create trigger trig_null before insert on loc2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
select * from rem2;
f1 | f2
----+----
(0 rows)
drop trigger trig_null on loc2;
delete from rem2;
-- Test a combination of local and remote triggers
create trigger rem2_trig_row_before before insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger rem2_trig_row_after after insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger loc2_trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
copy rem2 from stdin;
NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
NOTICE: NEW: (1,foo)
NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
NOTICE: NEW: (2,bar)
NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
NOTICE: NEW: (1,"foo triggered !")
NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
NOTICE: NEW: (2,"bar triggered !")
select * from rem2;
f1 | f2
----+-----------------
1 | foo triggered !
2 | bar triggered !
(2 rows)
drop trigger rem2_trig_row_before on rem2;
drop trigger rem2_trig_row_after on rem2;
drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
-- ===================================================================
-- test IMPORT FOREIGN SCHEMA
-- ===================================================================
CREATE SCHEMA import_source;

View File

@ -319,6 +319,10 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
TupleTableSlot *planSlot);
static void postgresEndForeignModify(EState *estate,
ResultRelInfo *resultRelInfo);
static void postgresBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo);
static void postgresEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo);
static int postgresIsForeignRelUpdatable(Relation rel);
static bool postgresPlanDirectModify(PlannerInfo *root,
ModifyTable *plan,
@ -473,6 +477,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->ExecForeignUpdate = postgresExecForeignUpdate;
routine->ExecForeignDelete = postgresExecForeignDelete;
routine->EndForeignModify = postgresEndForeignModify;
routine->BeginForeignInsert = postgresBeginForeignInsert;
routine->EndForeignInsert = postgresEndForeignInsert;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
routine->PlanDirectModify = postgresPlanDirectModify;
routine->BeginDirectModify = postgresBeginDirectModify;
@ -1959,6 +1965,96 @@ postgresEndForeignModify(EState *estate,
finish_foreign_modify(fmstate);
}
/*
* postgresBeginForeignInsert
* Begin an insert operation on a foreign table
*/
static void
postgresBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo)
{
PgFdwModifyState *fmstate;
Plan *plan = mtstate->ps.plan;
Relation rel = resultRelInfo->ri_RelationDesc;
RangeTblEntry *rte;
Query *query;
PlannerInfo *root;
TupleDesc tupdesc = RelationGetDescr(rel);
int attnum;
StringInfoData sql;
List *targetAttrs = NIL;
List *retrieved_attrs = NIL;
bool doNothing = false;
initStringInfo(&sql);
/* Set up largely-dummy planner state. */
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = RelationGetRelid(rel);
rte->relkind = RELKIND_FOREIGN_TABLE;
query = makeNode(Query);
query->commandType = CMD_INSERT;
query->resultRelation = 1;
query->rtable = list_make1(rte);
root = makeNode(PlannerInfo);
root->parse = query;
/* We transmit all columns that are defined in the foreign table. */
for (attnum = 1; attnum <= tupdesc->natts; attnum++)
{
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
if (!attr->attisdropped)
targetAttrs = lappend_int(targetAttrs, attnum);
}
/* Check if we add the ON CONFLICT clause to the remote query. */
if (plan)
{
OnConflictAction onConflictAction = ((ModifyTable *) plan)->onConflictAction;
/* We only support DO NOTHING without an inference specification. */
if (onConflictAction == ONCONFLICT_NOTHING)
doNothing = true;
else if (onConflictAction != ONCONFLICT_NONE)
elog(ERROR, "unexpected ON CONFLICT specification: %d",
(int) onConflictAction);
}
/* Construct the SQL command string. */
deparseInsertSql(&sql, root, 1, rel, targetAttrs, doNothing,
resultRelInfo->ri_returningList, &retrieved_attrs);
/* Construct an execution state. */
fmstate = create_foreign_modify(mtstate->ps.state,
resultRelInfo,
CMD_INSERT,
NULL,
sql.data,
targetAttrs,
retrieved_attrs != NIL,
retrieved_attrs);
resultRelInfo->ri_FdwState = fmstate;
}
/*
* postgresEndForeignInsert
* Finish an insert operation on a foreign table
*/
static void
postgresEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
Assert(fmstate != NULL);
/* Destroy the execution state */
finish_foreign_modify(fmstate);
}
/*
* postgresIsForeignRelUpdatable
* Determine whether a foreign table supports INSERT, UPDATE and/or

View File

@ -1767,6 +1767,243 @@ drop table bar cascade;
drop table loct1;
drop table loct2;
-- ===================================================================
-- test tuple routing for foreign-table partitions
-- ===================================================================
-- Test insert tuple routing
create table itrtest (a int, b text) partition by list (a);
create table loct1 (a int check (a in (1)), b text);
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
create table loct2 (a int check (a in (2)), b text);
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
alter table itrtest attach partition remp1 for values in (1);
alter table itrtest attach partition remp2 for values in (2);
insert into itrtest values (1, 'foo');
insert into itrtest values (1, 'bar') returning *;
insert into itrtest values (2, 'baz');
insert into itrtest values (2, 'qux') returning *;
insert into itrtest values (1, 'test1'), (2, 'test2') returning *;
select tableoid::regclass, * FROM itrtest;
select tableoid::regclass, * FROM remp1;
select tableoid::regclass, * FROM remp2;
delete from itrtest;
create unique index loct1_idx on loct1 (a);
-- DO NOTHING without an inference specification is supported
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
-- But other cases are not supported
insert into itrtest values (1, 'bar') on conflict (a) do nothing;
insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b;
select tableoid::regclass, * FROM itrtest;
drop table itrtest;
drop table loct1;
drop table loct2;
-- Test update tuple routing
create table utrtest (a int, b text) partition by list (a);
create table loct (a int check (a in (1)), b text);
create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct');
create table locp (a int check (a in (2)), b text);
alter table utrtest attach partition remp for values in (1);
alter table utrtest attach partition locp for values in (2);
insert into utrtest values (1, 'foo');
insert into utrtest values (2, 'qux');
select tableoid::regclass, * FROM utrtest;
select tableoid::regclass, * FROM remp;
select tableoid::regclass, * FROM locp;
-- It's not allowed to move a row from a partition that is foreign to another
update utrtest set a = 2 where b = 'foo' returning *;
-- But the reverse is allowed
update utrtest set a = 1 where b = 'qux' returning *;
select tableoid::regclass, * FROM utrtest;
select tableoid::regclass, * FROM remp;
select tableoid::regclass, * FROM locp;
-- The executor should not let unexercised FDWs shut down
update utrtest set a = 1 where b = 'foo';
drop table utrtest;
drop table loct;
-- Test copy tuple routing
create table ctrtest (a int, b text) partition by list (a);
create table loct1 (a int check (a in (1)), b text);
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
create table loct2 (a int check (a in (2)), b text);
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
alter table ctrtest attach partition remp1 for values in (1);
alter table ctrtest attach partition remp2 for values in (2);
copy ctrtest from stdin;
1 foo
2 qux
\.
select tableoid::regclass, * FROM ctrtest;
select tableoid::regclass, * FROM remp1;
select tableoid::regclass, * FROM remp2;
-- Copying into foreign partitions directly should work as well
copy remp1 from stdin;
1 bar
\.
select tableoid::regclass, * FROM remp1;
drop table ctrtest;
drop table loct1;
drop table loct2;
-- ===================================================================
-- test COPY FROM
-- ===================================================================
create table loc2 (f1 int, f2 text);
alter table loc2 set (autovacuum_enabled = 'false');
create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
-- Test basic functionality
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
delete from rem2;
-- Test check constraints
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- check constraint is enforced on the remote side, not locally
copy rem2 from stdin;
1 foo
2 bar
\.
copy rem2 from stdin; -- ERROR
-1 xyzzy
\.
select * from rem2;
alter foreign table rem2 drop constraint rem2_f1positive;
alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
-- Test local triggers
create trigger trig_stmt_before before insert on rem2
for each statement execute procedure trigger_func();
create trigger trig_stmt_after after insert on rem2
for each statement execute procedure trigger_func();
create trigger trig_row_before before insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger trig_row_after after insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger trig_row_before on rem2;
drop trigger trig_row_after on rem2;
drop trigger trig_stmt_before on rem2;
drop trigger trig_stmt_after on rem2;
delete from rem2;
create trigger trig_row_before_insert before insert on rem2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger trig_row_before_insert on rem2;
delete from rem2;
create trigger trig_null before insert on rem2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger trig_null on rem2;
delete from rem2;
-- Test remote triggers
create trigger trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
-- The new values are concatenated with ' triggered !'
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger trig_row_before_insert on loc2;
delete from rem2;
create trigger trig_null before insert on loc2
for each row execute procedure trig_null();
-- Nothing happens
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger trig_null on loc2;
delete from rem2;
-- Test a combination of local and remote triggers
create trigger rem2_trig_row_before before insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger rem2_trig_row_after after insert on rem2
for each row execute procedure trigger_data(23,'skidoo');
create trigger loc2_trig_row_before_insert before insert on loc2
for each row execute procedure trig_row_before_insupdate();
copy rem2 from stdin;
1 foo
2 bar
\.
select * from rem2;
drop trigger rem2_trig_row_before on rem2;
drop trigger rem2_trig_row_after on rem2;
drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
-- ===================================================================
-- test IMPORT FOREIGN SCHEMA
-- ===================================================================

View File

@ -3037,11 +3037,9 @@ VALUES ('Albany', NULL, NULL, 'NY');
</para>
<para>
Partitions can also be foreign tables
(see <xref linkend="sql-createforeigntable"/>),
although these have some limitations that normal tables do not. For
example, data inserted into the partitioned table is not routed to
foreign table partitions.
Partitions can also be foreign tables, although they have some limitations
that normal tables do not; see <xref linkend="sql-createforeigntable"> for
more information.
</para>
<para>

View File

@ -694,6 +694,72 @@ EndForeignModify(EState *estate,
<literal>NULL</literal>, no action is taken during executor shutdown.
</para>
<para>
Tuples inserted into a partitioned table by <command>INSERT</command> or
<command>COPY FROM</command> are routed to partitions. If an FDW
supports routable foreign-table partitions, it should also provide the
following callback functions. These functions are also called when
<command>COPY FROM</command> is executed on a foreign table.
</para>
<para>
<programlisting>
void
BeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *rinfo);
</programlisting>
Begin executing an insert operation on a foreign table. This routine is
called right before the first tuple is inserted into the foreign table
in both cases when it is the partition chosen for tuple routing and the
target specified in a <command>COPY FROM</command> command. It should
perform any initialization needed prior to the actual insertion.
Subsequently, <function>ExecForeignInsert</function> will be called for
each tuple to be inserted into the foreign table.
</para>
<para>
<literal>mtstate</literal> is the overall state of the
<structname>ModifyTable</structname> plan node being executed; global data about
the plan and execution state is available via this structure.
<literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
the target foreign table. (The <structfield>ri_FdwState</structfield> field of
<structname>ResultRelInfo</structname> is available for the FDW to store any
private state it needs for this operation.)
</para>
<para>
When this is called by a <command>COPY FROM</command> command, the
plan-related global data in <literal>mtstate</literal> is not provided
and the <literal>planSlot</literal> parameter of
<function>ExecForeignInsert</function> subsequently called for each
inserted tuple is <literal>NULL</literal>, whether the foreign table is
the partition chosen for tuple routing or the target specified in the
command.
</para>
<para>
If the <function>BeginForeignInsert</function> pointer is set to
<literal>NULL</literal>, no action is taken for the initialization.
</para>
<para>
<programlisting>
void
EndForeignInsert(EState *estate,
ResultRelInfo *rinfo);
</programlisting>
End the insert operation and release resources. It is normally not important
to release palloc'd memory, but for example open files and connections
to remote servers should be cleaned up.
</para>
<para>
If the <function>EndForeignInsert</function> pointer is set to
<literal>NULL</literal>, no action is taken for the termination.
</para>
<para>
<programlisting>
int

View File

@ -402,8 +402,9 @@ COPY <replaceable class="parameter">count</replaceable>
</para>
<para>
<command>COPY FROM</command> can be used with plain tables and with views
that have <literal>INSTEAD OF INSERT</literal> triggers.
<command>COPY FROM</command> can be used with plain, foreign, or
partitioned tables or with views that have
<literal>INSTEAD OF INSERT</literal> triggers.
</para>
<para>

View File

@ -291,6 +291,9 @@ UPDATE <replaceable class="parameter">count</replaceable>
concurrent <command>UPDATE</command> or <command>DELETE</command> on the
same row may miss this row. For details see the section
<xref linkend="ddl-partitioning-declarative-limitations"/>.
Currently, rows cannot be moved from a partition that is a
foreign table to some other partition, but they can be moved into a foreign
table if the foreign data wrapper supports it.
</para>
</refsect1>

View File

@ -31,6 +31,7 @@
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "foreign/fdwapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
@ -2302,6 +2303,7 @@ CopyFrom(CopyState cstate)
ResultRelInfo *resultRelInfo;
ResultRelInfo *saved_resultRelInfo = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ModifyTableState *mtstate;
ExprContext *econtext;
TupleTableSlot *myslot;
MemoryContext oldcontext = CurrentMemoryContext;
@ -2323,11 +2325,12 @@ CopyFrom(CopyState cstate)
Assert(cstate->rel);
/*
* The target must be a plain relation or have an INSTEAD OF INSERT row
* trigger. (Currently, such triggers are only allowed on views, so we
* only hint about them in the view case.)
* The target must be a plain, foreign, or partitioned relation, or have
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
* allowed on views, so we only hint about them in the view case.)
*/
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
!(cstate->rel->trigdesc &&
cstate->rel->trigdesc->trig_insert_instead_row))
@ -2343,11 +2346,6 @@ CopyFrom(CopyState cstate)
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to materialized view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to foreign table \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@ -2454,6 +2452,9 @@ CopyFrom(CopyState cstate)
NULL,
0);
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel(resultRelInfo, CMD_INSERT);
ExecOpenIndices(resultRelInfo, false);
estate->es_result_relations = resultRelInfo;
@ -2466,6 +2467,21 @@ CopyFrom(CopyState cstate)
/* Triggers might need a slot as well */
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
/*
* Set up a ModifyTableState so we can let FDW(s) init themselves for
* foreign-table result relation(s).
*/
mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = CMD_INSERT;
mtstate->resultRelInfo = estate->es_result_relations;
if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@ -2507,11 +2523,12 @@ CopyFrom(CopyState cstate)
* expressions. Such triggers or expressions might query the table we're
* inserting to, and act differently if the tuples that have already been
* processed and prepared for insertion are not there. We also can't do
* it if the table is partitioned.
* it if the table is foreign or partitioned.
*/
if ((resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
resultRelInfo->ri_FdwRoutine != NULL ||
cstate->partition_tuple_routing != NULL ||
cstate->volatile_defexprs)
{
@ -2626,19 +2643,13 @@ CopyFrom(CopyState cstate)
resultRelInfo = proute->partitions[leaf_part_index];
if (resultRelInfo == NULL)
{
resultRelInfo = ExecInitPartitionInfo(NULL,
resultRelInfo = ExecInitPartitionInfo(mtstate,
saved_resultRelInfo,
proute, estate,
leaf_part_index);
Assert(resultRelInfo != NULL);
}
/* We do not yet have a way to insert into a foreign partition */
if (resultRelInfo->ri_FdwRoutine)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot route inserted tuples to a foreign table")));
/*
* For ExecInsertIndexTuples() to work on the partition's indexes
*/
@ -2726,9 +2737,13 @@ CopyFrom(CopyState cstate)
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
check_partition_constr = false;
/* Check the constraints of the tuple */
if (resultRelInfo->ri_RelationDesc->rd_att->constr ||
check_partition_constr)
/*
* If the target is a plain table, check the constraints of
* the tuple.
*/
if (resultRelInfo->ri_FdwRoutine == NULL &&
(resultRelInfo->ri_RelationDesc->rd_att->constr ||
check_partition_constr))
ExecConstraints(resultRelInfo, slot, estate, true);
if (useHeapMultiInsert)
@ -2760,10 +2775,32 @@ CopyFrom(CopyState cstate)
{
List *recheckIndexes = NIL;
/* OK, store the tuple and create index entries for it */
heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
hi_options, bistate);
/* OK, store the tuple */
if (resultRelInfo->ri_FdwRoutine != NULL)
{
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
resultRelInfo,
slot,
NULL);
if (slot == NULL) /* "do nothing" */
goto next_tuple;
/* FDW might have changed tuple */
tuple = ExecMaterializeSlot(slot);
/*
* AFTER ROW Triggers might reference the tableoid
* column, so initialize t_tableOid before evaluating
* them.
*/
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
else
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
mycid, hi_options, bistate);
/* And create index entries for it */
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
@ -2781,13 +2818,14 @@ CopyFrom(CopyState cstate)
}
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
* this is the same definition used by execMain.c for counting
* tuples inserted by an INSERT command.
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
*/
processed++;
}
next_tuple:
/* Restore the saved ResultRelInfo */
if (saved_resultRelInfo)
{
@ -2828,11 +2866,17 @@ CopyFrom(CopyState cstate)
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
resultRelInfo);
ExecCloseIndices(resultRelInfo);
/* Close all the partitioned tables, leaf partitions, and their indices */
if (cstate->partition_tuple_routing)
ExecCleanupTupleRouting(cstate->partition_tuple_routing);
ExecCleanupTupleRouting(mtstate, cstate->partition_tuple_routing);
/* Close any trigger target relations */
ExecCleanUpTriggerState(estate);

View File

@ -1179,13 +1179,6 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
switch (operation)
{
case CMD_INSERT:
/*
* If foreign partition to do tuple-routing for, skip the
* check; it's disallowed elsewhere.
*/
if (resultRelInfo->ri_PartitionRoot)
break;
if (fdwroutine->ExecForeignInsert == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1378,6 +1371,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_PartitionCheck = partition_check;
resultRelInfo->ri_PartitionRoot = partition_root;
resultRelInfo->ri_PartitionReadyForRouting = false;
}
/*

View File

@ -18,6 +18,7 @@
#include "catalog/pg_type.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "foreign/fdwapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
@ -55,12 +56,13 @@ static List *adjust_partition_tlist(List *tlist, TupleConversionMap *map);
* see ExecInitPartitionInfo. However, if the function is invoked for update
* tuple routing, caller would already have initialized ResultRelInfo's for
* some of the partitions, which are reused and assigned to their respective
* slot in the aforementioned array.
* slot in the aforementioned array. For such partitions, we delay setting
* up objects such as TupleConversionMap until those are actually chosen as
* the partitions to route tuples to. See ExecPrepareTupleRouting.
*/
PartitionTupleRouting *
ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
{
TupleDesc tupDesc = RelationGetDescr(rel);
List *leaf_parts;
ListCell *cell;
int i;
@ -141,11 +143,7 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
if (update_rri_index < num_update_rri &&
RelationGetRelid(update_rri[update_rri_index].ri_RelationDesc) == leaf_oid)
{
Relation partrel;
TupleDesc part_tupdesc;
leaf_part_rri = &update_rri[update_rri_index];
partrel = leaf_part_rri->ri_RelationDesc;
/*
* This is required in order to convert the partition's tuple to
@ -159,23 +157,6 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
proute->subplan_partition_offsets[update_rri_index] = i;
update_rri_index++;
part_tupdesc = RelationGetDescr(partrel);
/*
* Save a tuple conversion map to convert a tuple routed to this
* partition from the parent's type to the partition's.
*/
proute->parent_child_tupconv_maps[i] =
convert_tuples_by_name(tupDesc, part_tupdesc,
gettext_noop("could not convert row type"));
/*
* Verify result relation is a valid target for an INSERT. An
* UPDATE of a partition-key becomes a DELETE+INSERT operation, so
* this check is required even when the operation is CMD_UPDATE.
*/
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
}
proute->partitions[i] = leaf_part_rri;
@ -347,10 +328,10 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
PartitionTupleRouting *proute,
EState *estate, int partidx)
{
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
Relation rootrel = resultRelInfo->ri_RelationDesc,
partrel;
ResultRelInfo *leaf_part_rri;
ModifyTable *node = mtstate ? (ModifyTable *) mtstate->ps.plan : NULL;
MemoryContext oldContext;
/*
@ -374,13 +355,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
leaf_part_rri->ri_PartitionLeafIndex = partidx;
/*
* Verify result relation is a valid target for an INSERT. An UPDATE of a
* partition-key becomes a DELETE+INSERT operation, so this check is still
* required when the operation is CMD_UPDATE.
*/
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
/*
* Since we've just initialized this ResultRelInfo, it's not in any list
* attached to the estate as yet. Add it, so that it can be found later.
@ -393,6 +367,9 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
lappend(estate->es_tuple_routing_result_relations,
leaf_part_rri);
/* Set up information needed for routing tuples to this partition. */
ExecInitRoutingInfo(mtstate, estate, proute, leaf_part_rri, partidx);
/*
* Open partition indices. The user may have asked to check for conflicts
* within this leaf partition and do "nothing" instead of throwing an
@ -498,6 +475,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
returningList = map_partition_varattnos(returningList, firstVarno,
partrel, firstResultRel,
NULL);
leaf_part_rri->ri_returningList = returningList;
/*
* Initialize the projection itself.
@ -514,15 +492,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
&mtstate->ps, RelationGetDescr(partrel));
}
/*
* Save a tuple conversion map to convert a tuple routed to this partition
* from the parent's type to the partition's.
*/
proute->parent_child_tupconv_maps[partidx] =
convert_tuples_by_name(RelationGetDescr(rootrel),
RelationGetDescr(partrel),
gettext_noop("could not convert row type"));
/*
* If there is an ON CONFLICT clause, initialize state for it.
*/
@ -751,6 +720,50 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
return leaf_part_rri;
}
/*
* ExecInitRoutingInfo
* Set up information needed for routing tuples to a leaf partition if
* routable; else abort the operation
*/
void
ExecInitRoutingInfo(ModifyTableState *mtstate,
EState *estate,
PartitionTupleRouting *proute,
ResultRelInfo *partRelInfo,
int partidx)
{
MemoryContext oldContext;
/* Verify the partition is a valid target for INSERT */
CheckValidResultRel(partRelInfo, CMD_INSERT);
/*
* Switch into per-query memory context.
*/
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
/*
* Set up a tuple conversion map to convert a tuple routed to the
* partition from the parent's type to the partition's.
*/
proute->parent_child_tupconv_maps[partidx] =
convert_tuples_by_name(RelationGetDescr(partRelInfo->ri_PartitionRoot),
RelationGetDescr(partRelInfo->ri_RelationDesc),
gettext_noop("could not convert row type"));
/*
* If the partition is a foreign table, let the FDW init itself for
* routing tuples to the partition.
*/
if (partRelInfo->ri_FdwRoutine != NULL &&
partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
MemoryContextSwitchTo(oldContext);
partRelInfo->ri_PartitionReadyForRouting = true;
}
/*
* ExecSetupChildParentMapForLeaf -- Initialize the per-leaf-partition
* child-to-root tuple conversion map array.
@ -853,7 +866,8 @@ ConvertPartitionTupleSlot(TupleConversionMap *map,
* Close all the partitioned tables, leaf partitions, and their indices.
*/
void
ExecCleanupTupleRouting(PartitionTupleRouting *proute)
ExecCleanupTupleRouting(ModifyTableState *mtstate,
PartitionTupleRouting *proute)
{
int i;
int subplan_index = 0;
@ -881,6 +895,13 @@ ExecCleanupTupleRouting(PartitionTupleRouting *proute)
if (resultRelInfo == NULL)
continue;
/* Allow any FDWs to shut down if they've been exercised */
if (resultRelInfo->ri_PartitionReadyForRouting &&
resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
resultRelInfo);
/*
* If this result rel is one of the UPDATE subplan result rels, let
* ExecEndPlan() close it. For INSERT or COPY,

View File

@ -1826,11 +1826,21 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
proute, estate,
partidx);
/* We do not yet have a way to insert into a foreign partition */
if (partrel->ri_FdwRoutine)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot route inserted tuples to a foreign table")));
/*
* Set up information needed for routing tuples to the partition if we
* didn't yet (ExecInitRoutingInfo would abort the operation if the
* partition isn't routable).
*
* Note: an UPDATE of a partition key invokes an INSERT that moves the
* tuple to a new partition. This setup would be needed for a subplan
* partition of such an UPDATE that is chosen as the partition to route
* the tuple to. The reason we do this setup here rather than in
* ExecSetupPartitionTupleRouting is to avoid aborting such an UPDATE
* unnecessarily due to non-routable subplan partitions that may not be
* chosen for update tuple movement after all.
*/
if (!partrel->ri_PartitionReadyForRouting)
ExecInitRoutingInfo(mtstate, estate, proute, partrel, partidx);
/*
* Make it look like we are inserting into the partition.
@ -2531,6 +2541,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
{
List *rlist = (List *) lfirst(l);
resultRelInfo->ri_returningList = rlist;
resultRelInfo->ri_projectReturning =
ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
resultRelInfo->ri_RelationDesc->rd_att);
@ -2830,7 +2841,7 @@ ExecEndModifyTable(ModifyTableState *node)
/* Close all the partitioned tables, leaf partitions, and their indices */
if (node->mt_partition_tuple_routing)
ExecCleanupTupleRouting(node->mt_partition_tuple_routing);
ExecCleanupTupleRouting(node, node->mt_partition_tuple_routing);
/*
* Free the exprcontext

View File

@ -119,6 +119,11 @@ extern ResultRelInfo *ExecInitPartitionInfo(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
PartitionTupleRouting *proute,
EState *estate, int partidx);
extern void ExecInitRoutingInfo(ModifyTableState *mtstate,
EState *estate,
PartitionTupleRouting *proute,
ResultRelInfo *partRelInfo,
int partidx);
extern void ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute);
extern TupleConversionMap *TupConvMapForLeaf(PartitionTupleRouting *proute,
ResultRelInfo *rootRelInfo, int leaf_index);
@ -126,6 +131,7 @@ extern HeapTuple ConvertPartitionTupleSlot(TupleConversionMap *map,
HeapTuple tuple,
TupleTableSlot *new_slot,
TupleTableSlot **p_my_slot);
extern void ExecCleanupTupleRouting(PartitionTupleRouting *proute);
extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
PartitionTupleRouting *proute);
#endif /* EXECPARTITION_H */

View File

@ -98,6 +98,12 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
typedef void (*EndForeignModify_function) (EState *estate,
ResultRelInfo *rinfo);
typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
ResultRelInfo *rinfo);
typedef void (*EndForeignInsert_function) (EState *estate,
ResultRelInfo *rinfo);
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@ -205,6 +211,8 @@ typedef struct FdwRoutine
ExecForeignUpdate_function ExecForeignUpdate;
ExecForeignDelete_function ExecForeignDelete;
EndForeignModify_function EndForeignModify;
BeginForeignInsert_function BeginForeignInsert;
EndForeignInsert_function EndForeignInsert;
IsForeignRelUpdatable_function IsForeignRelUpdatable;
PlanDirectModify_function PlanDirectModify;
BeginDirectModify_function BeginDirectModify;

View File

@ -444,6 +444,9 @@ typedef struct ResultRelInfo
/* for removing junk attributes from tuples */
JunkFilter *ri_junkFilter;
/* list of RETURNING expressions */
List *ri_returningList;
/* for computing a RETURNING list */
ProjectionInfo *ri_projectReturning;
@ -462,6 +465,9 @@ typedef struct ResultRelInfo
/* relation descriptor for root partitioned table */
Relation ri_PartitionRoot;
/* true if ready for tuple routing */
bool ri_PartitionReadyForRouting;
int ri_PartitionLeafIndex;
/* for running MERGE on this result relation */
MergeState *ri_mergeState;