Support "COPY view FROM" for views with INSTEAD OF INSERT triggers.

We just pass the data to the INSTEAD trigger.

Haribabu Kommi, reviewed by Dilip Kumar

Patch: <CAJrrPGcSQkrNkO+4PhLm4B8UQQQmU9YVUuqmtgM=pmzMfxWaWQ@mail.gmail.com>
This commit is contained in:
Tom Lane 2016-11-10 14:13:43 -05:00
parent e1b449bea9
commit 279c439c7f
4 changed files with 125 additions and 44 deletions

View File

@ -395,9 +395,15 @@ COPY <replaceable class="parameter">count</replaceable>
<title>Notes</title>
<para>
<command>COPY</command> can only be used with plain tables, not
<command>COPY TO</command> can only be used with plain tables, not
with views. However, you can write <literal>COPY (SELECT * FROM
<replaceable class="parameter">viewname</replaceable>) TO ...</literal>.
<replaceable class="parameter">viewname</replaceable>) TO ...</literal>
to copy the current contents of a view.
</para>
<para>
<command>COPY FROM</command> can be used with plain tables and with views
that have <literal>INSTEAD OF INSERT</> triggers.
</para>
<para>

View File

@ -864,8 +864,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, uint64 *processed)
* statement.
*
* In the case that columns are specified in the attribute list,
* create a ColumnRef and ResTarget for each column and add them to
* the target list for the resulting SELECT statement.
* create a ColumnRef and ResTarget for each column and add them
* to the target list for the resulting SELECT statement.
*/
if (!stmt->attlist)
{
@ -2269,13 +2269,21 @@ CopyFrom(CopyState cstate)
Assert(cstate->rel);
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
/*
* The target must be a plain relation or have an INSTEAD OF INSERT row
* trigger. (Currently, such triggers are only allowed on views, so we
* only hint about them in the view case.)
*/
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
!(cstate->rel->trigdesc &&
cstate->rel->trigdesc->trig_insert_instead_row))
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(cstate->rel))));
RelationGetRelationName(cstate->rel)),
errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@ -2496,52 +2504,64 @@ CopyFrom(CopyState cstate)
if (!skip_tuple)
{
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
if (useHeapMultiInsert)
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
{
/* Add this tuple to the tuple buffer */
if (nBufferedTuples == 0)
firstBufferedLineNo = cstate->cur_lineno;
bufferedTuples[nBufferedTuples++] = tuple;
bufferedTuplesSize += tuple->t_len;
/*
* If the buffer filled up, flush it. Also flush if the total
* size of all the tuples in the buffer becomes large, to
* avoid using large amounts of memory for the buffers when
* the tuples are exceptionally wide.
*/
if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
bufferedTuplesSize > 65535)
{
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
nBufferedTuples = 0;
bufferedTuplesSize = 0;
}
/* Pass the data to the INSTEAD ROW INSERT trigger */
ExecIRInsertTriggers(estate, resultRelInfo, slot);
}
else
{
List *recheckIndexes = NIL;
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* OK, store the tuple and create index entries for it */
heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
if (useHeapMultiInsert)
{
/* Add this tuple to the tuple buffer */
if (nBufferedTuples == 0)
firstBufferedLineNo = cstate->cur_lineno;
bufferedTuples[nBufferedTuples++] = tuple;
bufferedTuplesSize += tuple->t_len;
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
estate, false, NULL,
NIL);
/*
* If the buffer filled up, flush it. Also flush if the
* total size of all the tuples in the buffer becomes
* large, to avoid using large amounts of memory for the
* buffer when the tuples are exceptionally wide.
*/
if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
bufferedTuplesSize > 65535)
{
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
resultRelInfo, myslot, bistate,
nBufferedTuples, bufferedTuples,
firstBufferedLineNo);
nBufferedTuples = 0;
bufferedTuplesSize = 0;
}
}
else
{
List *recheckIndexes = NIL;
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple,
recheckIndexes);
/* OK, store the tuple and create index entries for it */
heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
list_free(recheckIndexes);
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
estate,
false,
NULL,
NIL);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple,
recheckIndexes);
list_free(recheckIndexes);
}
}
/*

View File

@ -535,6 +535,29 @@ COPY rls_t1 (a, b) TO stdout;
2 3
4 1
RESET SESSION AUTHORIZATION;
-- test with INSTEAD OF INSERT trigger on a view
CREATE TABLE instead_of_insert_tbl(id serial, name text);
CREATE VIEW instead_of_insert_tbl_view AS SELECT ''::text AS str;
COPY instead_of_insert_tbl_view FROM stdin; -- fail
ERROR: cannot copy to view "instead_of_insert_tbl_view"
HINT: To enable copying to a view, provide an INSTEAD OF INSERT trigger.
CREATE FUNCTION fun_instead_of_insert_tbl() RETURNS trigger AS $$
BEGIN
INSERT INTO instead_of_insert_tbl (name) VALUES (NEW.str);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trig_instead_of_insert_tbl_view
INSTEAD OF INSERT ON instead_of_insert_tbl_view
FOR EACH ROW EXECUTE PROCEDURE fun_instead_of_insert_tbl();
COPY instead_of_insert_tbl_view FROM stdin;
SELECT * FROM instead_of_insert_tbl;
id | name
----+-------
1 | test1
(1 row)
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;
DROP FUNCTION truncate_in_subxact();
@ -544,3 +567,6 @@ DROP ROLE regress_rls_copy_user;
DROP ROLE regress_rls_copy_user_colperms;
DROP FUNCTION fn_x_before();
DROP FUNCTION fn_x_after();
DROP TABLE instead_of_insert_tbl;
DROP VIEW instead_of_insert_tbl_view;
DROP FUNCTION fun_instead_of_insert_tbl();

View File

@ -387,6 +387,32 @@ COPY rls_t1 (a, b) TO stdout;
RESET SESSION AUTHORIZATION;
-- test with INSTEAD OF INSERT trigger on a view
CREATE TABLE instead_of_insert_tbl(id serial, name text);
CREATE VIEW instead_of_insert_tbl_view AS SELECT ''::text AS str;
COPY instead_of_insert_tbl_view FROM stdin; -- fail
test1
\.
CREATE FUNCTION fun_instead_of_insert_tbl() RETURNS trigger AS $$
BEGIN
INSERT INTO instead_of_insert_tbl (name) VALUES (NEW.str);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trig_instead_of_insert_tbl_view
INSTEAD OF INSERT ON instead_of_insert_tbl_view
FOR EACH ROW EXECUTE PROCEDURE fun_instead_of_insert_tbl();
COPY instead_of_insert_tbl_view FROM stdin;
test1
\.
SELECT * FROM instead_of_insert_tbl;
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;
DROP FUNCTION truncate_in_subxact();
@ -396,3 +422,6 @@ DROP ROLE regress_rls_copy_user;
DROP ROLE regress_rls_copy_user_colperms;
DROP FUNCTION fn_x_before();
DROP FUNCTION fn_x_after();
DROP TABLE instead_of_insert_tbl;
DROP VIEW instead_of_insert_tbl_view;
DROP FUNCTION fun_instead_of_insert_tbl();