WITH CHECK OPTION support for auto-updatable VIEWs

For simple views which are automatically updatable, this patch allows
the user to specify what level of checking should be done on records
being inserted or updated.  For 'LOCAL CHECK', new tuples are validated
against the conditionals of the view they are being inserted into, while
for 'CASCADED CHECK' the new tuples are validated against the
conditionals for all views involved (from the top down).

This option is part of the SQL specification.

Dean Rasheed, reviewed by Pavel Stehule
This commit is contained in:
Stephen Frost 2013-07-18 17:10:16 -04:00
parent 6f9e39bc99
commit 4cbe3ac3e8
33 changed files with 1245 additions and 107 deletions

View File

@ -28,6 +28,11 @@ ALTER VIEW [ IF EXISTS ] <replaceable class="parameter">name</replaceable> RENAM
ALTER VIEW [ IF EXISTS ] <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
ALTER VIEW [ IF EXISTS ] <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">view_option_name</replaceable> [= <replaceable class="parameter">view_option_value</replaceable>] [, ... ] )
ALTER VIEW [ IF EXISTS ] <replaceable class="parameter">name</replaceable> RESET ( <replaceable class="parameter">view_option_name</replaceable> [, ... ] )
<phrase>where <replaceable class="parameter">view_option_name</replaceable> can be one of:</phrase>
security_barrier [ <replaceable class="parameter">boolean</replaceable> ]
check_option [ <replaceable class="parameter">text</replaceable> (<literal>local</literal> or <literal>cascaded</literal>) ]
</synopsis>
</refsynopsisdiv>

View File

@ -24,6 +24,12 @@ PostgreSQL documentation
CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] [ RECURSIVE ] VIEW <replaceable class="PARAMETER">name</replaceable> [ ( <replaceable class="PARAMETER">column_name</replaceable> [, ...] ) ]
[ WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] ) ]
AS <replaceable class="PARAMETER">query</replaceable>
[ WITH [ CASCADED | LOCAL ] CHECK OPTION ]
<phrase>where <replaceable class="parameter">view_option_name</replaceable> can be one of:</phrase>
security_barrier [ <replaceable class="parameter">boolean</replaceable> ]
check_option [ <replaceable class="parameter">text</replaceable> (<literal>local</literal> or <literal>cascaded</literal>) ]
</synopsis>
</refsynopsisdiv>
@ -120,10 +126,33 @@ CREATE VIEW <replaceable>name</> AS WITH RECURSIVE <replaceable>name</> (<replac
<term><literal>WITH ( <replaceable class="PARAMETER">view_option_name</replaceable> [= <replaceable class="PARAMETER">view_option_value</replaceable>] [, ... ] )</literal></term>
<listitem>
<para>
This clause specifies optional parameters for a view; currently, the
only supported parameter name is <literal>security_barrier</literal>,
which should be enabled when a view is intended to provide row-level
security. See <xref linkend="rules-privileges"> for full details.
This clause specifies optional parameters for a view; the following
parameters are supported:
<variablelist>
<varlistentry>
<term><literal>security_barrier(boolean)</literal></term>
<listitem>
<para>
This should be used if the view is intended to provide row-level
security. See <xref linkend="rules-privileges"> for full details.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>check_option(text)</literal></term>
<listitem>
<para>
This parameter may be either <literal>local</> or
<literal>cascaded</>, and is equivalent to specifying
<literal>WITH [ CASCADED | LOCAL ] CHECK OPTION</> (see below).
This option can be changed on existing views using <xref
linkend="sql-alterview">.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
@ -138,6 +167,77 @@ CREATE VIEW <replaceable>name</> AS WITH RECURSIVE <replaceable>name</> (<replac
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>WITH [ CASCADED | LOCAL ] CHECK OPTION</literal></term>
<listitem>
<para>
<indexterm zone="SQL-CREATEVIEW">
<primary>CHECK OPTION</primary>
</indexterm>
<indexterm zone="SQL-CREATEVIEW">
<primary>WITH CHECK OPTION</primary>
</indexterm>
This option controls the behavior of automatically updatable views. When
this option is specified, <command>INSERT</> and <command>UPDATE</>
commands on the view will be checked to ensure that new rows satisfy the
view-defining condition (that is, the new rows are checked to ensure that
they are visible through the view). If they are not, the update will be
rejected. If the <literal>CHECK OPTION</> is not specified,
<command>INSERT</> and <command>UPDATE</> commands on the view are
allowed to create rows that are not visible through the view. The
following check options are supported:
<variablelist>
<varlistentry>
<term><literal>LOCAL</literal></term>
<listitem>
<para>
New rows are only checked against the conditions defined directly in
the view itself. Any conditions defined on underlying base views are
not checked (unless they also specify the <literal>CHECK OPTION</>).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>CASCADED</literal></term>
<listitem>
<para>
New rows are checked against the conditions of the view and all
underlying base views. If the <literal>CHECK OPTION</> is specified,
and neither <literal>LOCAL</> nor <literal>CASCADED</> is specified,
then <literal>CASCADED</> is assumed.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
The <literal>CHECK OPTION</> may not be used with <literal>RECURSIVE</>
views.
</para>
<para>
Note that the <literal>CHECK OPTION</> is only supported on views that
are automatically updatable, and do not have <literal>INSTEAD OF</>
triggers or <literal>INSTEAD</> rules. If an automatically updatable
view is defined on top of a base view that has <literal>INSTEAD OF</>
triggers, then the <literal>LOCAL CHECK OPTION</> may be used to check
the conditions on the automatically updatable view, but the conditions
on the base view with <literal>INSTEAD OF</> triggers will not be
checked (a cascaded check option will not cascade down to a
trigger-updatable view, and any check options defined directly on a
trigger-updatable view will be ignored). If the view or any of its base
relations has an <literal>INSTEAD</> rule that causes the
<command>INSERT</> or <command>UPDATE</> command to be rewritten, then
all check options will be ignored in the rewritten query, including any
checks from automatically updatable views defined on top of the relation
with the <literal>INSTEAD</> rule.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
@ -256,7 +356,9 @@ CREATE VIEW vista AS SELECT text 'Hello World' AS hello;
condition, and thus is no longer visible through the view. Similarly,
an <command>INSERT</> command can potentially insert base-relation rows
that do not satisfy the <literal>WHERE</> condition and thus are not
visible through the view.
visible through the view. The <literal>CHECK OPTION</> may be used to
prevent <command>INSERT</> and <command>UPDATE</> commands from creating
such rows that are not visible through the view.
</para>
<para>
@ -300,6 +402,38 @@ CREATE VIEW comedies AS
the table will not be part of the view.
</para>
<para>
Create a view with <literal>LOCAL CHECK OPTION</>:
<programlisting>
CREATE VIEW universal_comedies AS
SELECT *
FROM comedies
WHERE classification = 'U'
WITH LOCAL CHECK OPTION;
</programlisting>
This will create a view based on the <literal>comedies</> view, showing
only films with <literal>kind = 'Comedy'</> and
<literal>classification = 'U'</>. Any attempt to <command>INSERT</> or
<command>UPDATE</> a row in the view will be rejected if the new row
doesn't have <literal>classification = 'U'</>, but the film
<literal>kind</> will not be checked.
</para>
<para>
Create a view with <literal>CASCADED CHECK OPTION</>:
<programlisting>
CREATE VIEW pg_comedies AS
SELECT *
FROM comedies
WHERE classification = 'PG'
WITH CASCADED CHECK OPTION;
</programlisting>
This will create a view that checks both the <literal>kind</> and
<literal>classification</> of new rows.
</para>
<para>
Create a recursive view consisting of the numbers from 1 to 100:
<programlisting>
@ -313,64 +447,11 @@ UNION ALL
<refsect1>
<title>Compatibility</title>
<para>
The SQL standard specifies some additional capabilities for the
<command>CREATE VIEW</command> statement:
<synopsis>
CREATE VIEW <replaceable class="parameter">name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
AS <replaceable class="PARAMETER">query</replaceable>
[ WITH [ CASCADED | LOCAL ] CHECK OPTION ]
</synopsis>
</para>
<para>
The optional clauses for the full SQL command are:
<variablelist>
<varlistentry>
<term><literal>CHECK OPTION</literal></term>
<listitem>
<para>
This option controls the behavior of automatically updatable views.
When given, <command>INSERT</> and <command>UPDATE</> commands on
the view will be checked to ensure new rows satisfy the
view-defining condition (that is, the new rows would be visible
through the view). If they do not, the update will be rejected.
Without <literal>CHECK OPTION</literal>, <command>INSERT</> and
<command>UPDATE</> commands on the view are allowed to create rows
that are not visible through the view. (The latter behavior is the
only one currently provided by <productname>PostgreSQL</>.)
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>LOCAL</literal></term>
<listitem>
<para>
Check for integrity on this view.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>CASCADED</literal></term>
<listitem>
<para>
Check for integrity on this view and on any dependent
view. <literal>CASCADED</> is assumed if neither
<literal>CASCADED</> nor <literal>LOCAL</> is specified.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
<command>CREATE OR REPLACE VIEW</command> is a
<productname>PostgreSQL</productname> language extension.
So is the concept of a temporary view.
The <literal>WITH</> clause is an extension as well.
The <literal>WITH ( ... )</> clause is an extension as well.
</para>
</refsect1>

View File

@ -24,6 +24,7 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablespace.h"
#include "commands/view.h"
#include "nodes/makefuncs.h"
#include "utils/array.h"
#include "utils/attoptcache.h"
@ -248,6 +249,17 @@ static relopt_string stringRelOpts[] =
gistValidateBufferingOption,
"auto"
},
{
{
"check_option",
"View has WITH CHECK OPTION defined (local or cascaded).",
RELOPT_KIND_VIEW
},
0,
true,
validateWithCheckOption,
NULL
},
/* list terminator */
{{NULL}}
};
@ -1152,6 +1164,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
offsetof(StdRdOptions, autovacuum) +offsetof(AutoVacOpts, analyze_scale_factor)},
{"security_barrier", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, security_barrier)},
{"check_option", RELOPT_TYPE_STRING,
offsetof(StdRdOptions, check_option_offset)},
};
options = parseRelOptions(reloptions, validate, kind, &numoptions);

View File

@ -2494,7 +2494,13 @@ CREATE VIEW views AS
ELSE null END
AS character_data) AS view_definition,
CAST('NONE' AS character_data) AS check_option,
CAST(
CASE WHEN 'check_option=cascaded' = ANY (c.reloptions)
THEN 'CASCADED'
WHEN 'check_option=local' = ANY (c.reloptions)
THEN 'LOCAL'
ELSE 'NONE' END
AS character_data) AS check_option,
CAST(
-- (1 << CMD_UPDATE) + (1 << CMD_DELETE)

View File

@ -227,7 +227,7 @@ F311 Schema definition statement NO
F311 Schema definition statement 01 CREATE SCHEMA YES
F311 Schema definition statement 02 CREATE TABLE for persistent base tables YES
F311 Schema definition statement 03 CREATE VIEW YES
F311 Schema definition statement 04 CREATE VIEW: WITH CHECK OPTION NO
F311 Schema definition statement 04 CREATE VIEW: WITH CHECK OPTION YES
F311 Schema definition statement 05 GRANT statement YES
F312 MERGE statement NO
F313 Enhanced MERGE statement NO
@ -301,7 +301,7 @@ F711 ALTER domain YES
F721 Deferrable constraints NO foreign and unique keys only
F731 INSERT column privileges YES
F741 Referential MATCH types NO no partial match yet
F751 View CHECK enhancements NO
F751 View CHECK enhancements YES
F761 Session management YES
F762 CURRENT_CATALOG YES
F763 CURRENT_SCHEMA YES

View File

@ -8774,6 +8774,42 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
break;
}
/* Special-case validation of view options */
if (rel->rd_rel->relkind == RELKIND_VIEW)
{
Query *view_query = get_view_query(rel);
List *view_options = untransformRelOptions(newOptions);
ListCell *cell;
bool check_option = false;
bool security_barrier = false;
foreach(cell, view_options)
{
DefElem *defel = (DefElem *) lfirst(cell);
if (pg_strcasecmp(defel->defname, "check_option") == 0)
check_option = true;
if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
security_barrier = defGetBoolean(defel);
}
/*
* If the check option is specified, look to see if the view is
* actually auto-updatable or not.
*/
if (check_option)
{
const char *view_updatable_error =
view_query_is_auto_updatable(view_query, security_barrier);
if (view_updatable_error)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION is supported only on auto-updatable views"),
errhint("%s", view_updatable_error)));
}
}
/*
* All we need do here is update the pg_class row; the new options will be
* propagated into relcaches during post-commit cache inval.

View File

@ -27,6 +27,7 @@
#include "parser/parse_relation.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteSupport.h"
#include "utils/acl.h"
#include "utils/builtins.h"
@ -37,6 +38,24 @@
static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
/*---------------------------------------------------------------------
* Validator for "check_option" reloption on views. The allowed values
* are "local" and "cascaded".
*/
void
validateWithCheckOption(char *value)
{
if (value == NULL ||
(pg_strcasecmp(value, "local") != 0 &&
pg_strcasecmp(value, "cascaded") != 0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for \"check_option\" option"),
errdetail("Valid values are \"local\", and \"cascaded\".")));
}
}
/*---------------------------------------------------------------------
* DefineVirtualRelation
*
@ -374,6 +393,9 @@ DefineView(ViewStmt *stmt, const char *queryString)
Query *viewParse;
Oid viewOid;
RangeVar *view;
ListCell *cell;
bool check_option;
bool security_barrier;
/*
* Run parse analysis to convert the raw parse tree to a Query. Note this
@ -410,6 +432,52 @@ DefineView(ViewStmt *stmt, const char *queryString)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("views must not contain data-modifying statements in WITH")));
/*
* If the user specified the WITH CHECK OPTION, add it to the list of
* reloptions.
*/
if (stmt->withCheckOption == LOCAL_CHECK_OPTION)
stmt->options = lappend(stmt->options,
makeDefElem("check_option",
(Node *) makeString("local")));
else if (stmt->withCheckOption == CASCADED_CHECK_OPTION)
stmt->options = lappend(stmt->options,
makeDefElem("check_option",
(Node *) makeString("cascaded")));
/*
* Check that the view is auto-updatable if WITH CHECK OPTION was
* specified.
*/
check_option = false;
security_barrier = false;
foreach(cell, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(cell);
if (pg_strcasecmp(defel->defname, "check_option") == 0)
check_option = true;
if (pg_strcasecmp(defel->defname, "security_barrier") == 0)
security_barrier = defGetBoolean(defel);
}
/*
* If the check option is specified, look to see if the view is
* actually auto-updatable or not.
*/
if (check_option)
{
const char *view_updatable_error =
view_query_is_auto_updatable(viewParse, security_barrier);
if (view_updatable_error)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION is supported only on auto-updatable views"),
errhint("%s", view_updatable_error)));
}
/*
* If a list of column names was given, run through and insert these into
* the actual query tree. - thomas 2000-03-08

View File

@ -1623,6 +1623,49 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
}
}
/*
* ExecWithCheckOptions -- check that tuple satisfies any WITH CHECK OPTIONs
*/
void
ExecWithCheckOptions(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
{
ExprContext *econtext;
ListCell *l1, *l2;
/*
* We will use the EState's per-tuple context for evaluating constraint
* expressions (creating it if it's not already there).
*/
econtext = GetPerTupleExprContext(estate);
/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = slot;
/* Check each of the constraints */
forboth(l1, resultRelInfo->ri_WithCheckOptions,
l2, resultRelInfo->ri_WithCheckOptionExprs)
{
WithCheckOption *wco = (WithCheckOption *) lfirst(l1);
ExprState *wcoExpr = (ExprState *) lfirst(l2);
/*
* WITH CHECK OPTION checks are intended to ensure that the new tuple
* is visible in the view. If the view's qual evaluates to NULL, then
* the new tuple won't be included in the view. Therefore we need to
* tell ExecQual to return FALSE for NULL (the opposite of what we do
* above for CHECK constraints).
*/
if (!ExecQual((List *) wcoExpr, econtext, false))
ereport(ERROR,
(errcode(ERRCODE_WITH_CHECK_OPTION_VIOLATION),
errmsg("new row violates WITH CHECK OPTION for view \"%s\"",
wco->viewname),
errdetail("Failing row contains %s.",
ExecBuildSlotValueDescription(slot, 64))));
}
}
/*
* ExecBuildSlotValueDescription -- construct a string representing a tuple
*

View File

@ -281,6 +281,10 @@ ExecInsert(TupleTableSlot *slot,
list_free(recheckIndexes);
/* Check any WITH CHECK OPTION constraints */
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(resultRelInfo, slot, estate);
/* Process RETURNING if present */
if (resultRelInfo->ri_projectReturning)
return ExecProcessReturning(resultRelInfo->ri_projectReturning,
@ -777,6 +781,10 @@ lreplace:;
list_free(recheckIndexes);
/* Check any WITH CHECK OPTION constraints */
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(resultRelInfo, slot, estate);
/* Process RETURNING if present */
if (resultRelInfo->ri_projectReturning)
return ExecProcessReturning(resultRelInfo->ri_projectReturning,
@ -1129,6 +1137,31 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
estate->es_result_relation_info = saved_resultRelInfo;
/*
* Initialize any WITH CHECK OPTION constraints if needed.
*/
resultRelInfo = mtstate->resultRelInfo;
i = 0;
foreach(l, node->withCheckOptionLists)
{
List *wcoList = (List *) lfirst(l);
List *wcoExprs = NIL;
ListCell *ll;
foreach(ll, wcoList)
{
WithCheckOption *wco = (WithCheckOption *) lfirst(ll);
ExprState *wcoExpr = ExecInitExpr((Expr *) wco->qual,
mtstate->mt_plans[i]);
wcoExprs = lappend(wcoExprs, wcoExpr);
}
resultRelInfo->ri_WithCheckOptions = wcoList;
resultRelInfo->ri_WithCheckOptionExprs = wcoExprs;
resultRelInfo++;
i++;
}
/*
* Initialize RETURNING projections if needed.
*/

View File

@ -178,6 +178,7 @@ _copyModifyTable(const ModifyTable *from)
COPY_NODE_FIELD(resultRelations);
COPY_SCALAR_FIELD(resultRelIndex);
COPY_NODE_FIELD(plans);
COPY_NODE_FIELD(withCheckOptionLists);
COPY_NODE_FIELD(returningLists);
COPY_NODE_FIELD(fdwPrivLists);
COPY_NODE_FIELD(rowMarks);
@ -2003,6 +2004,18 @@ _copyRangeTblEntry(const RangeTblEntry *from)
return newnode;
}
static WithCheckOption *
_copyWithCheckOption(const WithCheckOption *from)
{
WithCheckOption *newnode = makeNode(WithCheckOption);
COPY_STRING_FIELD(viewname);
COPY_NODE_FIELD(qual);
COPY_SCALAR_FIELD(cascaded);
return newnode;
}
static SortGroupClause *
_copySortGroupClause(const SortGroupClause *from)
{
@ -2446,6 +2459,7 @@ _copyQuery(const Query *from)
COPY_NODE_FIELD(rtable);
COPY_NODE_FIELD(jointree);
COPY_NODE_FIELD(targetList);
COPY_NODE_FIELD(withCheckOptions);
COPY_NODE_FIELD(returningList);
COPY_NODE_FIELD(groupClause);
COPY_NODE_FIELD(havingQual);
@ -3075,6 +3089,7 @@ _copyViewStmt(const ViewStmt *from)
COPY_NODE_FIELD(query);
COPY_SCALAR_FIELD(replace);
COPY_NODE_FIELD(options);
COPY_SCALAR_FIELD(withCheckOption);
return newnode;
}
@ -4517,6 +4532,9 @@ copyObject(const void *from)
case T_RangeTblEntry:
retval = _copyRangeTblEntry(from);
break;
case T_WithCheckOption:
retval = _copyWithCheckOption(from);
break;
case T_SortGroupClause:
retval = _copySortGroupClause(from);
break;

View File

@ -853,6 +853,7 @@ _equalQuery(const Query *a, const Query *b)
COMPARE_NODE_FIELD(rtable);
COMPARE_NODE_FIELD(jointree);
COMPARE_NODE_FIELD(targetList);
COMPARE_NODE_FIELD(withCheckOptions);
COMPARE_NODE_FIELD(returningList);
COMPARE_NODE_FIELD(groupClause);
COMPARE_NODE_FIELD(havingQual);
@ -1382,6 +1383,7 @@ _equalViewStmt(const ViewStmt *a, const ViewStmt *b)
COMPARE_NODE_FIELD(query);
COMPARE_SCALAR_FIELD(replace);
COMPARE_NODE_FIELD(options);
COMPARE_SCALAR_FIELD(withCheckOption);
return true;
}
@ -2253,6 +2255,16 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
return true;
}
static bool
_equalWithCheckOption(const WithCheckOption *a, const WithCheckOption *b)
{
COMPARE_STRING_FIELD(viewname);
COMPARE_NODE_FIELD(qual);
COMPARE_SCALAR_FIELD(cascaded);
return true;
}
static bool
_equalSortGroupClause(const SortGroupClause *a, const SortGroupClause *b)
{
@ -2987,6 +2999,9 @@ equal(const void *a, const void *b)
case T_RangeTblEntry:
retval = _equalRangeTblEntry(a, b);
break;
case T_WithCheckOption:
retval = _equalWithCheckOption(a, b);
break;
case T_SortGroupClause:
retval = _equalSortGroupClause(a, b);
break;

View File

@ -1556,6 +1556,8 @@ expression_tree_walker(Node *node,
case T_SortGroupClause:
/* primitive node types with no expression subnodes */
break;
case T_WithCheckOption:
return walker(((WithCheckOption *) node)->qual, context);
case T_Aggref:
{
Aggref *expr = (Aggref *) node;
@ -1873,6 +1875,8 @@ query_tree_walker(Query *query,
if (walker((Node *) query->targetList, context))
return true;
if (walker((Node *) query->withCheckOptions, context))
return true;
if (walker((Node *) query->returningList, context))
return true;
if (walker((Node *) query->jointree, context))
@ -2074,6 +2078,15 @@ expression_tree_mutator(Node *node,
case T_RangeTblRef:
case T_SortGroupClause:
return (Node *) copyObject(node);
case T_WithCheckOption:
{
WithCheckOption *wco = (WithCheckOption *) node;
WithCheckOption *newnode;
FLATCOPY(newnode, wco, WithCheckOption);
MUTATE(newnode->qual, wco->qual, Node *);
return (Node *) newnode;
}
case T_Aggref:
{
Aggref *aggref = (Aggref *) node;
@ -2589,6 +2602,7 @@ query_tree_mutator(Query *query,
}
MUTATE(query->targetList, query->targetList, List *);
MUTATE(query->withCheckOptions, query->withCheckOptions, List *);
MUTATE(query->returningList, query->returningList, List *);
MUTATE(query->jointree, query->jointree, FromExpr *);
MUTATE(query->setOperations, query->setOperations, Node *);

View File

@ -332,6 +332,7 @@ _outModifyTable(StringInfo str, const ModifyTable *node)
WRITE_NODE_FIELD(resultRelations);
WRITE_INT_FIELD(resultRelIndex);
WRITE_NODE_FIELD(plans);
WRITE_NODE_FIELD(withCheckOptionLists);
WRITE_NODE_FIELD(returningLists);
WRITE_NODE_FIELD(fdwPrivLists);
WRITE_NODE_FIELD(rowMarks);
@ -2247,6 +2248,7 @@ _outQuery(StringInfo str, const Query *node)
WRITE_NODE_FIELD(rtable);
WRITE_NODE_FIELD(jointree);
WRITE_NODE_FIELD(targetList);
WRITE_NODE_FIELD(withCheckOptions);
WRITE_NODE_FIELD(returningList);
WRITE_NODE_FIELD(groupClause);
WRITE_NODE_FIELD(havingQual);
@ -2260,6 +2262,16 @@ _outQuery(StringInfo str, const Query *node)
WRITE_NODE_FIELD(constraintDeps);
}
static void
_outWithCheckOption(StringInfo str, const WithCheckOption *node)
{
WRITE_NODE_TYPE("WITHCHECKOPTION");
WRITE_STRING_FIELD(viewname);
WRITE_NODE_FIELD(qual);
WRITE_BOOL_FIELD(cascaded);
}
static void
_outSortGroupClause(StringInfo str, const SortGroupClause *node)
{
@ -3114,6 +3126,9 @@ _outNode(StringInfo str, const void *obj)
case T_Query:
_outQuery(str, obj);
break;
case T_WithCheckOption:
_outWithCheckOption(str, obj);
break;
case T_SortGroupClause:
_outSortGroupClause(str, obj);
break;

View File

@ -210,6 +210,7 @@ _readQuery(void)
READ_NODE_FIELD(rtable);
READ_NODE_FIELD(jointree);
READ_NODE_FIELD(targetList);
READ_NODE_FIELD(withCheckOptions);
READ_NODE_FIELD(returningList);
READ_NODE_FIELD(groupClause);
READ_NODE_FIELD(havingQual);
@ -254,6 +255,21 @@ _readDeclareCursorStmt(void)
READ_DONE();
}
/*
* _readWithCheckOption
*/
static WithCheckOption *
_readWithCheckOption(void)
{
READ_LOCALS(WithCheckOption);
READ_STRING_FIELD(viewname);
READ_NODE_FIELD(qual);
READ_BOOL_FIELD(cascaded);
READ_DONE();
}
/*
* _readSortGroupClause
*/
@ -1260,6 +1276,8 @@ parseNodeString(void)
if (MATCH("QUERY", 5))
return_value = _readQuery();
else if (MATCH("WITHCHECKOPTION", 15))
return_value = _readWithCheckOption();
else if (MATCH("SORTGROUPCLAUSE", 15))
return_value = _readSortGroupClause();
else if (MATCH("WINDOWCLAUSE", 12))

View File

@ -4702,16 +4702,16 @@ make_result(PlannerInfo *root,
* Build a ModifyTable plan node
*
* Currently, we don't charge anything extra for the actual table modification
* work, nor for the RETURNING expressions if any. It would only be window
* dressing, since these are always top-level nodes and there is no way for
* the costs to change any higher-level planning choices. But we might want
* to make it look better sometime.
* work, nor for the WITH CHECK OPTIONS or RETURNING expressions if any. It
* would only be window dressing, since these are always top-level nodes and
* there is no way for the costs to change any higher-level planning choices.
* But we might want to make it look better sometime.
*/
ModifyTable *
make_modifytable(PlannerInfo *root,
CmdType operation, bool canSetTag,
List *resultRelations,
List *subplans, List *returningLists,
List *resultRelations, List *subplans,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, int epqParam)
{
ModifyTable *node = makeNode(ModifyTable);
@ -4723,6 +4723,8 @@ make_modifytable(PlannerInfo *root,
int i;
Assert(list_length(resultRelations) == list_length(subplans));
Assert(withCheckOptionLists == NIL ||
list_length(resultRelations) == list_length(withCheckOptionLists));
Assert(returningLists == NIL ||
list_length(resultRelations) == list_length(returningLists));
@ -4759,6 +4761,7 @@ make_modifytable(PlannerInfo *root,
node->resultRelations = resultRelations;
node->resultRelIndex = -1; /* will be set correctly in setrefs.c */
node->plans = subplans;
node->withCheckOptionLists = withCheckOptionLists;
node->returningLists = returningLists;
node->rowMarks = rowMarks;
node->epqParam = epqParam;

View File

@ -294,6 +294,7 @@ subquery_planner(PlannerGlobal *glob, Query *parse,
int num_old_subplans = list_length(glob->subplans);
PlannerInfo *root;
Plan *plan;
List *newWithCheckOptions;
List *newHaving;
bool hasOuterJoins;
ListCell *l;
@ -421,6 +422,18 @@ subquery_planner(PlannerGlobal *glob, Query *parse,
preprocess_expression(root, (Node *) parse->targetList,
EXPRKIND_TARGET);
newWithCheckOptions = NIL;
foreach(l, parse->withCheckOptions)
{
WithCheckOption *wco = (WithCheckOption *) lfirst(l);
wco->qual = preprocess_expression(root, wco->qual,
EXPRKIND_QUAL);
if (wco->qual != NULL)
newWithCheckOptions = lappend(newWithCheckOptions, wco);
}
parse->withCheckOptions = newWithCheckOptions;
parse->returningList = (List *)
preprocess_expression(root, (Node *) parse->returningList,
EXPRKIND_TARGET);
@ -559,12 +572,19 @@ subquery_planner(PlannerGlobal *glob, Query *parse,
/* If it's not SELECT, we need a ModifyTable node */
if (parse->commandType != CMD_SELECT)
{
List *withCheckOptionLists;
List *returningLists;
List *rowMarks;
/*
* Set up the RETURNING list-of-lists, if needed.
* Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if
* needed.
*/
if (parse->withCheckOptions)
withCheckOptionLists = list_make1(parse->withCheckOptions);
else
withCheckOptionLists = NIL;
if (parse->returningList)
returningLists = list_make1(parse->returningList);
else
@ -585,6 +605,7 @@ subquery_planner(PlannerGlobal *glob, Query *parse,
parse->canSetTag,
list_make1_int(parse->resultRelation),
list_make1(plan),
withCheckOptionLists,
returningLists,
rowMarks,
SS_assign_special_param(root));
@ -770,6 +791,7 @@ inheritance_planner(PlannerInfo *root)
RelOptInfo **save_rel_array = NULL;
List *subplans = NIL;
List *resultRelations = NIL;
List *withCheckOptionLists = NIL;
List *returningLists = NIL;
List *rowMarks;
ListCell *lc;
@ -930,7 +952,10 @@ inheritance_planner(PlannerInfo *root)
/* Build list of target-relation RT indexes */
resultRelations = lappend_int(resultRelations, appinfo->child_relid);
/* Build list of per-relation RETURNING targetlists */
/* Build lists of per-relation WCO and RETURNING targetlists */
if (parse->withCheckOptions)
withCheckOptionLists = lappend(withCheckOptionLists,
subroot.parse->withCheckOptions);
if (parse->returningList)
returningLists = lappend(returningLists,
subroot.parse->returningList);
@ -979,6 +1004,7 @@ inheritance_planner(PlannerInfo *root)
parse->canSetTag,
resultRelations,
subplans,
withCheckOptionLists,
returningLists,
rowMarks,
SS_assign_special_param(root));

View File

@ -470,7 +470,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> constraints_set_list
%type <boolean> constraints_set_mode
%type <str> OptTableSpace OptConsTableSpace OptTableSpaceOwner
%type <list> opt_check_option
%type <ival> opt_check_option
%type <str> opt_provider security_label
@ -7995,6 +7995,7 @@ ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
n->query = $8;
n->replace = false;
n->options = $6;
n->withCheckOption = $9;
$$ = (Node *) n;
}
| CREATE OR REPLACE OptTemp VIEW qualified_name opt_column_list opt_reloptions
@ -8007,10 +8008,11 @@ ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
n->query = $10;
n->replace = true;
n->options = $8;
n->withCheckOption = $11;
$$ = (Node *) n;
}
| CREATE OptTemp RECURSIVE VIEW qualified_name '(' columnList ')' opt_reloptions
AS SelectStmt
AS SelectStmt opt_check_option
{
ViewStmt *n = makeNode(ViewStmt);
n->view = $5;
@ -8019,10 +8021,16 @@ ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
n->query = makeRecursiveViewSelect(n->view->relname, n->aliases, $11);
n->replace = false;
n->options = $9;
n->withCheckOption = $12;
if (n->withCheckOption != NO_CHECK_OPTION)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION not supported on recursive views"),
parser_errposition(@12)));
$$ = (Node *) n;
}
| CREATE OR REPLACE OptTemp RECURSIVE VIEW qualified_name '(' columnList ')' opt_reloptions
AS SelectStmt
AS SelectStmt opt_check_option
{
ViewStmt *n = makeNode(ViewStmt);
n->view = $7;
@ -8031,30 +8039,21 @@ ViewStmt: CREATE OptTemp VIEW qualified_name opt_column_list opt_reloptions
n->query = makeRecursiveViewSelect(n->view->relname, n->aliases, $13);
n->replace = true;
n->options = $11;
n->withCheckOption = $14;
if (n->withCheckOption != NO_CHECK_OPTION)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION not supported on recursive views"),
parser_errposition(@14)));
$$ = (Node *) n;
}
;
opt_check_option:
WITH CHECK OPTION
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION is not implemented")));
}
| WITH CASCADED CHECK OPTION
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION is not implemented")));
}
| WITH LOCAL CHECK OPTION
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("WITH CHECK OPTION is not implemented")));
}
| /* EMPTY */ { $$ = NIL; }
WITH CHECK OPTION { $$ = CASCADED_CHECK_OPTION; }
| WITH CASCADED CHECK OPTION { $$ = CASCADED_CHECK_OPTION; }
| WITH LOCAL CHECK OPTION { $$ = LOCAL_CHECK_OPTION; }
| /* EMPTY */ { $$ = NO_CHECK_OPTION; }
;
/*****************************************************************************

View File

@ -19,6 +19,7 @@
#include "foreign/fdwapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "parser/analyze.h"
#include "parser/parse_coerce.h"
#include "parser/parsetree.h"
@ -1866,7 +1867,7 @@ fireRules(Query *parsetree,
* Caller should have verified that the relation is a view, and therefore
* we should find an ON SELECT action.
*/
static Query *
Query *
get_view_query(Relation view)
{
int i;
@ -1927,11 +1928,16 @@ view_has_instead_trigger(Relation view, CmdType event)
/*
* view_is_auto_updatable -
* Test if the specified view can be automatically updated. This will
* either return NULL (if the view can be updated) or a message string
* giving the reason that it cannot be.
* Retrive the view definition and options and then determine if the view
* can be auto-updated by calling view_query_is_auto_updatable(). Returns
* NULL or a message string giving the reason the view is not auto
* updateable. See view_query_is_auto_updatable() for details.
*
* Caller must have verified that relation is a view!
* The only view option which affects if a view can be auto-updated, today,
* is the security_barrier option. If other options are added later, they
* will also need to be handled here.
*
* Caller must have verified that the relation is a view!
*
* Note that the checks performed here are local to this view. We do not
* check whether the view's underlying base relation is updatable; that
@ -1940,10 +1946,32 @@ view_has_instead_trigger(Relation view, CmdType event)
* Also note that we don't check for INSTEAD triggers or rules here; those
* also prevent auto-update, but they must be checked for by the caller.
*/
static const char *
const char *
view_is_auto_updatable(Relation view)
{
Query *viewquery = get_view_query(view);
bool security_barrier = RelationIsSecurityView(view);
return view_query_is_auto_updatable(viewquery, security_barrier);
}
/*
* view_query_is_auto_updatable -
* Test if the specified view definition can be automatically updated, given
* the view's options (currently only security_barrier affects a view's
* auto-updatable status).
*
* This will either return NULL (if the view can be updated) or a message
* string giving the reason that it cannot be.
*
* Note that the checks performed here are only based on the view
* definition. We do not check whether any base relations referred to by
* the view are updatable.
*/
const char *
view_query_is_auto_updatable(Query *viewquery, bool security_barrier)
{
RangeTblRef *rtr;
RangeTblEntry *base_rte;
Bitmapset *bms;
@ -1995,9 +2023,9 @@ view_is_auto_updatable(Relation view)
/*
* For now, we also don't support security-barrier views, because of the
* difficulty of keeping upper-level qual expressions away from
* lower-level data. This might get relaxed in future.
* lower-level data. This might get relaxed in the future.
*/
if (RelationIsSecurityView(view))
if (security_barrier)
return gettext_noop("Security-barrier views are not automatically updatable.");
/*
@ -2532,8 +2560,7 @@ rewriteTargetView(Query *parsetree, Relation view)
* only adjust their varnos to reference the new target (just the same as
* we did with the view targetlist).
*
* For INSERT, the view's quals can be ignored for now. When we implement
* WITH CHECK OPTION, this might be a good place to collect them.
* For INSERT, the view's quals can be ignored in the main query.
*/
if (parsetree->commandType != CMD_INSERT &&
viewquery->jointree->quals != NULL)
@ -2544,6 +2571,76 @@ rewriteTargetView(Query *parsetree, Relation view)
AddQual(parsetree, (Node *) viewqual);
}
/*
* For INSERT/UPDATE, if the view has the WITH CHECK OPTION, or any parent
* view specified WITH CASCADED CHECK OPTION, add the quals from the view
* to the query's withCheckOptions list.
*/
if (parsetree->commandType != CMD_DELETE)
{
bool has_wco = RelationHasCheckOption(view);
bool cascaded = RelationHasCascadedCheckOption(view);
/*
* If the parent view has a cascaded check option, treat this view as
* if it also had a cascaded check option.
*
* New WithCheckOptions are added to the start of the list, so if there
* is a cascaded check option, it will be the first item in the list.
*/
if (parsetree->withCheckOptions != NIL)
{
WithCheckOption *parent_wco =
(WithCheckOption *) linitial(parsetree->withCheckOptions);
if (parent_wco->cascaded)
{
has_wco = true;
cascaded = true;
}
}
/*
* Add the new WithCheckOption to the start of the list, so that
* checks on inner views are run before checks on outer views, as
* required by the SQL standard.
*
* If the new check is CASCADED, we need to add it even if this view
* has no quals, since there may be quals on child views. A LOCAL
* check can be omitted if this view has no quals.
*/
if (has_wco && (cascaded || viewquery->jointree->quals != NULL))
{
WithCheckOption *wco;
wco = makeNode(WithCheckOption);
wco->viewname = pstrdup(RelationGetRelationName(view));
wco->qual = NULL;
wco->cascaded = cascaded;
parsetree->withCheckOptions = lcons(wco,
parsetree->withCheckOptions);
if (viewquery->jointree->quals != NULL)
{
wco->qual = (Node *) copyObject(viewquery->jointree->quals);
ChangeVarNodes(wco->qual, base_rt_index, new_rt_index, 0);
/*
* Make sure that the query is marked correctly if the added
* qual has sublinks. We can skip this check if the query is
* already marked, or if the command is an UPDATE, in which
* case the same qual will have already been added to the
* query's WHERE clause, and AddQual will have already done
* this check.
*/
if (!parsetree->hasSubLinks &&
parsetree->commandType != CMD_UPDATE)
parsetree->hasSubLinks = checkExprHasSubLink(wco->qual);
}
}
}
return parsetree;
}

View File

@ -4224,6 +4224,7 @@ getTables(Archive *fout, int *numTables)
int i_owning_col;
int i_reltablespace;
int i_reloptions;
int i_checkoption;
int i_toastreloptions;
int i_reloftype;
int i_relpages;
@ -4271,7 +4272,9 @@ getTables(Archive *fout, int *numTables)
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
"(SELECT spcname FROM pg_tablespace t WHERE t.oid = c.reltablespace) AS reltablespace, "
"array_to_string(c.reloptions, ', ') AS reloptions, "
"array_to_string(array_remove(array_remove(c.reloptions,'check_option=local'),'check_option=cascaded'), ', ') AS reloptions, "
"CASE WHEN 'check_option=local' = ANY (c.reloptions) THEN 'LOCAL'::text "
"WHEN 'check_option=cascaded' = ANY (c.reloptions) THEN 'CASCADED'::text ELSE NULL END AS checkoption, "
"array_to_string(array(SELECT 'toast.' || x FROM unnest(tc.reloptions) x), ', ') AS toast_reloptions "
"FROM pg_class c "
"LEFT JOIN pg_depend d ON "
@ -4635,6 +4638,7 @@ getTables(Archive *fout, int *numTables)
i_owning_col = PQfnumber(res, "owning_col");
i_reltablespace = PQfnumber(res, "reltablespace");
i_reloptions = PQfnumber(res, "reloptions");
i_checkoption = PQfnumber(res, "checkoption");
i_toastreloptions = PQfnumber(res, "toast_reloptions");
i_reloftype = PQfnumber(res, "reloftype");
@ -4694,6 +4698,10 @@ getTables(Archive *fout, int *numTables)
}
tblinfo[i].reltablespace = pg_strdup(PQgetvalue(res, i, i_reltablespace));
tblinfo[i].reloptions = pg_strdup(PQgetvalue(res, i, i_reloptions));
if (i_checkoption == -1 || PQgetisnull(res, i, i_checkoption))
tblinfo[i].checkoption = NULL;
else
tblinfo[i].checkoption = pg_strdup(PQgetvalue(res, i, i_checkoption));
tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
/* other fields were zeroed above */
@ -12835,9 +12843,13 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
if (tbinfo->reloptions && strlen(tbinfo->reloptions) > 0)
appendPQExpBuffer(q, " WITH (%s)", tbinfo->reloptions);
result = createViewAsClause(fout, tbinfo);
appendPQExpBuffer(q, " AS\n%s;\n", result->data);
appendPQExpBuffer(q, " AS\n%s", result->data);
destroyPQExpBuffer(result);
if (tbinfo->checkoption != NULL)
appendPQExpBuffer(q, "\n WITH %s CHECK OPTION", tbinfo->checkoption);
appendPQExpBuffer(q, ";\n");
appendPQExpBuffer(labelq, "VIEW %s",
fmtId(tbinfo->dobj.name));
}

View File

@ -239,6 +239,7 @@ typedef struct _tableInfo
bool relispopulated; /* relation is populated */
char *reltablespace; /* relation tablespace */
char *reloptions; /* options specified by WITH (...) */
char *checkoption; /* WITH CHECK OPTION */
char *toast_reloptions; /* ditto, for the TOAST table */
bool hasindex; /* does it have any indexes? */
bool hasrules; /* does it have any rules? */

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201307161
#define CATALOG_VERSION_NO 201307181
#endif

View File

@ -16,6 +16,8 @@
#include "nodes/parsenodes.h"
extern void validateWithCheckOption(char *value);
extern Oid DefineView(ViewStmt *stmt, const char *queryString);
extern void StoreViewQuery(Oid viewOid, Query *viewParse, bool replace);

View File

@ -191,6 +191,8 @@ extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
extern void ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern void ExecWithCheckOptions(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern ExecRowMark *ExecFindRowMark(EState *estate, Index rti);
extern ExecAuxRowMark *ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist);
extern TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate,

View File

@ -303,6 +303,8 @@ typedef struct JunkFilter
* TrigInstrument optional runtime measurements for triggers
* FdwRoutine FDW callback functions, if foreign table
* FdwState available to save private state of FDW
* WithCheckOptions list of WithCheckOption's for views
* WithCheckOptionExprs list of WithCheckOption expr states
* ConstraintExprs array of constraint-checking expr states
* junkFilter for removing junk attributes from tuples
* projectReturning for computing a RETURNING list
@ -322,6 +324,8 @@ typedef struct ResultRelInfo
Instrumentation *ri_TrigInstrument;
struct FdwRoutine *ri_FdwRoutine;
void *ri_FdwState;
List *ri_WithCheckOptions;
List *ri_WithCheckOptionExprs;
List **ri_ConstraintExprs;
JunkFilter *ri_junkFilter;
ProjectionInfo *ri_projectReturning;

View File

@ -388,6 +388,7 @@ typedef enum NodeTag
T_Constraint,
T_DefElem,
T_RangeTblEntry,
T_WithCheckOption,
T_SortGroupClause,
T_WindowClause,
T_PrivGrantee,

View File

@ -128,6 +128,8 @@ typedef struct Query
List *targetList; /* target list (of TargetEntry) */
List *withCheckOptions; /* a list of WithCheckOption's */
List *returningList; /* return-values list (of TargetEntry) */
List *groupClause; /* a list of SortGroupClause's */
@ -783,6 +785,19 @@ typedef struct RangeTblEntry
Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */
} RangeTblEntry;
/*
* WithCheckOption -
* representation of WITH CHECK OPTION checks to be applied to new tuples
* when inserting/updating an auto-updatable view.
*/
typedef struct WithCheckOption
{
NodeTag type;
char *viewname; /* name of view that specified the WCO */
Node *qual; /* constraint qual to check */
bool cascaded; /* true = WITH CASCADED CHECK OPTION */
} WithCheckOption;
/*
* SortGroupClause -
* representation of ORDER BY, GROUP BY, PARTITION BY,
@ -2333,6 +2348,13 @@ typedef struct AlterEnumStmt
* Create View Statement
* ----------------------
*/
typedef enum ViewCheckOption
{
NO_CHECK_OPTION,
LOCAL_CHECK_OPTION,
CASCADED_CHECK_OPTION
} ViewCheckOption;
typedef struct ViewStmt
{
NodeTag type;
@ -2341,6 +2363,7 @@ typedef struct ViewStmt
Node *query; /* the SELECT query */
bool replace; /* replace an existing view? */
List *options; /* options from WITH clause */
ViewCheckOption withCheckOption; /* WITH CHECK OPTION */
} ViewStmt;
/* ----------------------

View File

@ -172,6 +172,7 @@ typedef struct ModifyTable
List *resultRelations; /* integer list of RT indexes */
int resultRelIndex; /* index of first resultRel in plan's list */
List *plans; /* plan(s) producing source data */
List *withCheckOptionLists; /* per-target-table WCO lists */
List *returningLists; /* per-target-table RETURNING tlists */
List *fdwPrivLists; /* per-target-table FDW private data lists */
List *rowMarks; /* PlanRowMarks (non-locking only) */

View File

@ -85,7 +85,8 @@ extern Result *make_result(PlannerInfo *root, List *tlist,
Node *resconstantqual, Plan *subplan);
extern ModifyTable *make_modifytable(PlannerInfo *root,
CmdType operation, bool canSetTag,
List *resultRelations, List *subplans, List *returningLists,
List *resultRelations, List *subplans,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, int epqParam);
extern bool is_projection_capable_plan(Plan *plan);

View File

@ -21,6 +21,10 @@ extern List *QueryRewrite(Query *parsetree);
extern void AcquireRewriteLocks(Query *parsetree, bool forUpdatePushedDown);
extern Node *build_column_default(Relation rel, int attrno);
extern Query *get_view_query(Relation view);
extern const char *view_is_auto_updatable(Relation view);
extern const char *view_query_is_auto_updatable(Query *viewquery,
bool security_barrier);
extern int relation_is_updatable(Oid reloid, bool include_triggers);
#endif /* REWRITEHANDLER_H */

View File

@ -208,6 +208,7 @@ typedef struct StdRdOptions
int fillfactor; /* page fill factor in percent (0..100) */
AutoVacOpts autovacuum; /* autovacuum-related options */
bool security_barrier; /* for views */
int check_option_offset; /* for views */
} StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10
@ -243,6 +244,39 @@ typedef struct StdRdOptions
((relation)->rd_options ? \
((StdRdOptions *) (relation)->rd_options)->security_barrier : false)
/*
* RelationHasCheckOption
* Returns true if the relation is a view defined with either the local
* or the cascaded check option.
*/
#define RelationHasCheckOption(relation) \
((relation)->rd_options && \
((StdRdOptions *) (relation)->rd_options)->check_option_offset != 0)
/*
* RelationHasLocalCheckOption
* Returns true if the relation is a view defined with the local check
* option.
*/
#define RelationHasLocalCheckOption(relation) \
((relation)->rd_options && \
((StdRdOptions *) (relation)->rd_options)->check_option_offset != 0 ? \
strcmp((char *) (relation)->rd_options + \
((StdRdOptions *) (relation)->rd_options)->check_option_offset, \
"local") == 0 : false)
/*
* RelationHasCascadedCheckOption
* Returns true if the relation is a view defined with the cascaded check
* option.
*/
#define RelationHasCascadedCheckOption(relation) \
((relation)->rd_options && \
((StdRdOptions *) (relation)->rd_options)->check_option_offset != 0 ? \
strcmp((char *) (relation)->rd_options + \
((StdRdOptions *) (relation)->rd_options)->check_option_offset, \
"cascaded") == 0 : false)
/*
* RelationIsValid
* True iff relation descriptor is valid.

View File

@ -252,7 +252,7 @@ CREATE VIEW mysecview4 WITH (security_barrier)
AS SELECT * FROM tbl1 WHERE a <> 0;
CREATE VIEW mysecview5 WITH (security_barrier=100) -- Error
AS SELECT * FROM tbl1 WHERE a > 100;
ERROR: invalid value for boolean option "security_barrier": 100
ERROR: security_barrier requires a Boolean value
CREATE VIEW mysecview6 WITH (invalid_option) -- Error
AS SELECT * FROM tbl1 WHERE a < 100;
ERROR: unrecognized parameter "invalid_option"

View File

@ -1163,3 +1163,366 @@ DROP TABLE base_tbl_parent, base_tbl_child CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to view rw_view1
drop cascades to view rw_view2
-- simple WITH CHECK OPTION
CREATE TABLE base_tbl (a int, b int DEFAULT 10);
INSERT INTO base_tbl VALUES (1,2), (2,3), (1,-1);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a < b
WITH LOCAL CHECK OPTION;
\d+ rw_view1
View "public.rw_view1"
Column | Type | Modifiers | Storage | Description
--------+---------+-----------+---------+-------------
a | integer | | plain |
b | integer | | plain |
View definition:
SELECT base_tbl.a,
base_tbl.b
FROM base_tbl
WHERE base_tbl.a < base_tbl.b;
Options: check_option=local
SELECT * FROM information_schema.views WHERE table_name = 'rw_view1';
table_catalog | table_schema | table_name | view_definition | check_option | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
---------------+--------------+------------+------------------------------------+--------------+--------------+--------------------+----------------------+----------------------+----------------------------
regression | public | rw_view1 | SELECT base_tbl.a, +| LOCAL | YES | YES | NO | NO | NO
| | | base_tbl.b +| | | | | |
| | | FROM base_tbl +| | | | | |
| | | WHERE (base_tbl.a < base_tbl.b); | | | | | |
(1 row)
INSERT INTO rw_view1 VALUES(3,4); -- ok
INSERT INTO rw_view1 VALUES(4,3); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (4, 3).
INSERT INTO rw_view1 VALUES(5,null); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (5, null).
UPDATE rw_view1 SET b = 5 WHERE a = 3; -- ok
UPDATE rw_view1 SET b = -5 WHERE a = 3; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (3, -5).
INSERT INTO rw_view1(a) VALUES (9); -- ok
INSERT INTO rw_view1(a) VALUES (10); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (10, 10).
SELECT * FROM base_tbl;
a | b
---+----
1 | 2
2 | 3
1 | -1
3 | 5
9 | 10
(5 rows)
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to view rw_view1
-- WITH LOCAL/CASCADED CHECK OPTION
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a > 0;
CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a < 10
WITH CHECK OPTION; -- implicitly cascaded
\d+ rw_view2
View "public.rw_view2"
Column | Type | Modifiers | Storage | Description
--------+---------+-----------+---------+-------------
a | integer | | plain |
View definition:
SELECT rw_view1.a
FROM rw_view1
WHERE rw_view1.a < 10;
Options: check_option=cascaded
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
table_catalog | table_schema | table_name | view_definition | check_option | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
---------------+--------------+------------+----------------------------+--------------+--------------+--------------------+----------------------+----------------------+----------------------------
regression | public | rw_view2 | SELECT rw_view1.a +| CASCADED | YES | YES | NO | NO | NO
| | | FROM rw_view1 +| | | | | |
| | | WHERE (rw_view1.a < 10); | | | | | |
(1 row)
INSERT INTO rw_view2 VALUES (-5); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (-5).
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (15); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (15).
SELECT * FROM base_tbl;
a
---
5
(1 row)
UPDATE rw_view2 SET a = a - 10; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (-5).
UPDATE rw_view2 SET a = a + 10; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (15).
CREATE OR REPLACE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a < 10
WITH LOCAL CHECK OPTION;
\d+ rw_view2
View "public.rw_view2"
Column | Type | Modifiers | Storage | Description
--------+---------+-----------+---------+-------------
a | integer | | plain |
View definition:
SELECT rw_view1.a
FROM rw_view1
WHERE rw_view1.a < 10;
Options: check_option=local
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
table_catalog | table_schema | table_name | view_definition | check_option | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
---------------+--------------+------------+----------------------------+--------------+--------------+--------------------+----------------------+----------------------+----------------------------
regression | public | rw_view2 | SELECT rw_view1.a +| LOCAL | YES | YES | NO | NO | NO
| | | FROM rw_view1 +| | | | | |
| | | WHERE (rw_view1.a < 10); | | | | | |
(1 row)
INSERT INTO rw_view2 VALUES (-10); -- ok, but not in view
INSERT INTO rw_view2 VALUES (20); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (20).
SELECT * FROM base_tbl;
a
-----
5
-10
(2 rows)
ALTER VIEW rw_view1 SET (check_option=here); -- invalid
ERROR: invalid value for "check_option" option
DETAIL: Valid values are "local", and "cascaded".
ALTER VIEW rw_view1 SET (check_option=local);
INSERT INTO rw_view2 VALUES (-20); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (-20).
INSERT INTO rw_view2 VALUES (30); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (30).
ALTER VIEW rw_view2 RESET (check_option);
\d+ rw_view2
View "public.rw_view2"
Column | Type | Modifiers | Storage | Description
--------+---------+-----------+---------+-------------
a | integer | | plain |
View definition:
SELECT rw_view1.a
FROM rw_view1
WHERE rw_view1.a < 10;
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
table_catalog | table_schema | table_name | view_definition | check_option | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
---------------+--------------+------------+----------------------------+--------------+--------------+--------------------+----------------------+----------------------+----------------------------
regression | public | rw_view2 | SELECT rw_view1.a +| NONE | YES | YES | NO | NO | NO
| | | FROM rw_view1 +| | | | | |
| | | WHERE (rw_view1.a < 10); | | | | | |
(1 row)
INSERT INTO rw_view2 VALUES (30); -- ok, but not in view
SELECT * FROM base_tbl;
a
-----
5
-10
30
(3 rows)
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to view rw_view1
drop cascades to view rw_view2
-- WITH CHECK OPTION with no local view qual
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WITH CHECK OPTION;
CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a > 0;
CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WITH CHECK OPTION;
SELECT * FROM information_schema.views WHERE table_name LIKE E'rw\_view_' ORDER BY table_name;
table_catalog | table_schema | table_name | view_definition | check_option | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into
---------------+--------------+------------+---------------------------+--------------+--------------+--------------------+----------------------+----------------------+----------------------------
regression | public | rw_view1 | SELECT base_tbl.a +| CASCADED | YES | YES | NO | NO | NO
| | | FROM base_tbl; | | | | | |
regression | public | rw_view2 | SELECT rw_view1.a +| NONE | YES | YES | NO | NO | NO
| | | FROM rw_view1 +| | | | | |
| | | WHERE (rw_view1.a > 0); | | | | | |
regression | public | rw_view3 | SELECT rw_view2.a +| CASCADED | YES | YES | NO | NO | NO
| | | FROM rw_view2; | | | | | |
(3 rows)
INSERT INTO rw_view1 VALUES (-1); -- ok
INSERT INTO rw_view1 VALUES (1); -- ok
INSERT INTO rw_view2 VALUES (-2); -- ok, but not in view
INSERT INTO rw_view2 VALUES (2); -- ok
INSERT INTO rw_view3 VALUES (-3); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (-3).
INSERT INTO rw_view3 VALUES (3); -- ok
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to view rw_view1
drop cascades to view rw_view2
drop cascades to view rw_view3
-- WITH CHECK OPTION with subquery
CREATE TABLE base_tbl (a int);
CREATE TABLE ref_tbl (a int PRIMARY KEY);
INSERT INTO ref_tbl SELECT * FROM generate_series(1,10);
CREATE VIEW rw_view1 AS
SELECT * FROM base_tbl b
WHERE EXISTS(SELECT 1 FROM ref_tbl r WHERE r.a = b.a)
WITH CHECK OPTION;
INSERT INTO rw_view1 VALUES (5); -- ok
INSERT INTO rw_view1 VALUES (15); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (15).
UPDATE rw_view1 SET a = a + 5; -- ok
UPDATE rw_view1 SET a = a + 5; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (15).
EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (5);
QUERY PLAN
---------------------------------------------------------------
Insert on base_tbl b
-> Result
SubPlan 1
-> Index Only Scan using ref_tbl_pkey on ref_tbl r
Index Cond: (a = b.a)
SubPlan 2
-> Seq Scan on ref_tbl r_1
(7 rows)
EXPLAIN (costs off) UPDATE rw_view1 SET a = a + 5;
QUERY PLAN
-----------------------------------------------------------------
Update on base_tbl b
-> Hash Semi Join
Hash Cond: (b.a = r.a)
-> Seq Scan on base_tbl b
-> Hash
-> Seq Scan on ref_tbl r
SubPlan 1
-> Index Only Scan using ref_tbl_pkey on ref_tbl r_1
Index Cond: (a = b.a)
SubPlan 2
-> Seq Scan on ref_tbl r_2
(11 rows)
DROP TABLE base_tbl, ref_tbl CASCADE;
NOTICE: drop cascades to view rw_view1
-- WITH CHECK OPTION with BEFORE trigger on base table
CREATE TABLE base_tbl (a int, b int);
CREATE FUNCTION base_tbl_trig_fn()
RETURNS trigger AS
$$
BEGIN
NEW.b := 10;
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER base_tbl_trig BEFORE INSERT OR UPDATE ON base_tbl
FOR EACH ROW EXECUTE PROCEDURE base_tbl_trig_fn();
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a < b WITH CHECK OPTION;
INSERT INTO rw_view1 VALUES (5,0); -- ok
INSERT INTO rw_view1 VALUES (15, 20); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (15, 10).
UPDATE rw_view1 SET a = 20, b = 30; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view1"
DETAIL: Failing row contains (20, 10).
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to view rw_view1
DROP FUNCTION base_tbl_trig_fn();
-- WITH LOCAL CHECK OPTION with INSTEAD OF trigger on base view
CREATE TABLE base_tbl (a int, b int);
CREATE VIEW rw_view1 AS SELECT a FROM base_tbl WHERE a < b;
CREATE FUNCTION rw_view1_trig_fn()
RETURNS trigger AS
$$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO base_tbl VALUES (NEW.a, 10);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE base_tbl SET a=NEW.a WHERE a=OLD.a;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM base_tbl WHERE a=OLD.a;
RETURN OLD;
END IF;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER rw_view1_trig
INSTEAD OF INSERT OR UPDATE OR DELETE ON rw_view1
FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
CREATE VIEW rw_view2 AS
SELECT * FROM rw_view1 WHERE a > 0 WITH LOCAL CHECK OPTION;
INSERT INTO rw_view2 VALUES (-5); -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (-5).
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (50); -- ok, but not in view
UPDATE rw_view2 SET a = a - 10; -- should fail
ERROR: new row violates WITH CHECK OPTION for view "rw_view2"
DETAIL: Failing row contains (-5).
SELECT * FROM base_tbl;
a | b
----+----
5 | 10
50 | 10
(2 rows)
-- Check option won't cascade down to base view with INSTEAD OF triggers
ALTER VIEW rw_view2 SET (check_option=cascaded);
INSERT INTO rw_view2 VALUES (100); -- ok, but not in view (doesn't fail rw_view1's check)
UPDATE rw_view2 SET a = 200 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view1's check)
SELECT * FROM base_tbl;
a | b
-----+----
50 | 10
100 | 10
200 | 10
(3 rows)
-- Neither local nor cascaded check options work with INSTEAD rules
DROP TRIGGER rw_view1_trig ON rw_view1;
CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, 10);
CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1
DO INSTEAD UPDATE base_tbl SET a=NEW.a WHERE a=OLD.a;
INSERT INTO rw_view2 VALUES (-10); -- ok, but not in view (doesn't fail rw_view2's check)
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (20); -- ok, but not in view (doesn't fail rw_view1's check)
UPDATE rw_view2 SET a = 30 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view1's check)
INSERT INTO rw_view2 VALUES (5); -- ok
UPDATE rw_view2 SET a = -5 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view2's check)
SELECT * FROM base_tbl;
a | b
-----+----
50 | 10
100 | 10
200 | 10
-10 | 10
20 | 10
30 | 10
-5 | 10
(7 rows)
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to view rw_view1
drop cascades to view rw_view2
DROP FUNCTION rw_view1_trig_fn();
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT a,10 AS b FROM base_tbl;
CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a);
CREATE VIEW rw_view2 AS
SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION;
INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check)
DROP TABLE base_tbl CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to view rw_view1
drop cascades to view rw_view2

View File

@ -541,3 +541,202 @@ SELECT * FROM ONLY base_tbl_parent ORDER BY a;
SELECT * FROM base_tbl_child ORDER BY a;
DROP TABLE base_tbl_parent, base_tbl_child CASCADE;
-- simple WITH CHECK OPTION
CREATE TABLE base_tbl (a int, b int DEFAULT 10);
INSERT INTO base_tbl VALUES (1,2), (2,3), (1,-1);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a < b
WITH LOCAL CHECK OPTION;
\d+ rw_view1
SELECT * FROM information_schema.views WHERE table_name = 'rw_view1';
INSERT INTO rw_view1 VALUES(3,4); -- ok
INSERT INTO rw_view1 VALUES(4,3); -- should fail
INSERT INTO rw_view1 VALUES(5,null); -- should fail
UPDATE rw_view1 SET b = 5 WHERE a = 3; -- ok
UPDATE rw_view1 SET b = -5 WHERE a = 3; -- should fail
INSERT INTO rw_view1(a) VALUES (9); -- ok
INSERT INTO rw_view1(a) VALUES (10); -- should fail
SELECT * FROM base_tbl;
DROP TABLE base_tbl CASCADE;
-- WITH LOCAL/CASCADED CHECK OPTION
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a > 0;
CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a < 10
WITH CHECK OPTION; -- implicitly cascaded
\d+ rw_view2
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
INSERT INTO rw_view2 VALUES (-5); -- should fail
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (15); -- should fail
SELECT * FROM base_tbl;
UPDATE rw_view2 SET a = a - 10; -- should fail
UPDATE rw_view2 SET a = a + 10; -- should fail
CREATE OR REPLACE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a < 10
WITH LOCAL CHECK OPTION;
\d+ rw_view2
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
INSERT INTO rw_view2 VALUES (-10); -- ok, but not in view
INSERT INTO rw_view2 VALUES (20); -- should fail
SELECT * FROM base_tbl;
ALTER VIEW rw_view1 SET (check_option=here); -- invalid
ALTER VIEW rw_view1 SET (check_option=local);
INSERT INTO rw_view2 VALUES (-20); -- should fail
INSERT INTO rw_view2 VALUES (30); -- should fail
ALTER VIEW rw_view2 RESET (check_option);
\d+ rw_view2
SELECT * FROM information_schema.views WHERE table_name = 'rw_view2';
INSERT INTO rw_view2 VALUES (30); -- ok, but not in view
SELECT * FROM base_tbl;
DROP TABLE base_tbl CASCADE;
-- WITH CHECK OPTION with no local view qual
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WITH CHECK OPTION;
CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE a > 0;
CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WITH CHECK OPTION;
SELECT * FROM information_schema.views WHERE table_name LIKE E'rw\_view_' ORDER BY table_name;
INSERT INTO rw_view1 VALUES (-1); -- ok
INSERT INTO rw_view1 VALUES (1); -- ok
INSERT INTO rw_view2 VALUES (-2); -- ok, but not in view
INSERT INTO rw_view2 VALUES (2); -- ok
INSERT INTO rw_view3 VALUES (-3); -- should fail
INSERT INTO rw_view3 VALUES (3); -- ok
DROP TABLE base_tbl CASCADE;
-- WITH CHECK OPTION with subquery
CREATE TABLE base_tbl (a int);
CREATE TABLE ref_tbl (a int PRIMARY KEY);
INSERT INTO ref_tbl SELECT * FROM generate_series(1,10);
CREATE VIEW rw_view1 AS
SELECT * FROM base_tbl b
WHERE EXISTS(SELECT 1 FROM ref_tbl r WHERE r.a = b.a)
WITH CHECK OPTION;
INSERT INTO rw_view1 VALUES (5); -- ok
INSERT INTO rw_view1 VALUES (15); -- should fail
UPDATE rw_view1 SET a = a + 5; -- ok
UPDATE rw_view1 SET a = a + 5; -- should fail
EXPLAIN (costs off) INSERT INTO rw_view1 VALUES (5);
EXPLAIN (costs off) UPDATE rw_view1 SET a = a + 5;
DROP TABLE base_tbl, ref_tbl CASCADE;
-- WITH CHECK OPTION with BEFORE trigger on base table
CREATE TABLE base_tbl (a int, b int);
CREATE FUNCTION base_tbl_trig_fn()
RETURNS trigger AS
$$
BEGIN
NEW.b := 10;
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER base_tbl_trig BEFORE INSERT OR UPDATE ON base_tbl
FOR EACH ROW EXECUTE PROCEDURE base_tbl_trig_fn();
CREATE VIEW rw_view1 AS SELECT * FROM base_tbl WHERE a < b WITH CHECK OPTION;
INSERT INTO rw_view1 VALUES (5,0); -- ok
INSERT INTO rw_view1 VALUES (15, 20); -- should fail
UPDATE rw_view1 SET a = 20, b = 30; -- should fail
DROP TABLE base_tbl CASCADE;
DROP FUNCTION base_tbl_trig_fn();
-- WITH LOCAL CHECK OPTION with INSTEAD OF trigger on base view
CREATE TABLE base_tbl (a int, b int);
CREATE VIEW rw_view1 AS SELECT a FROM base_tbl WHERE a < b;
CREATE FUNCTION rw_view1_trig_fn()
RETURNS trigger AS
$$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO base_tbl VALUES (NEW.a, 10);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE base_tbl SET a=NEW.a WHERE a=OLD.a;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM base_tbl WHERE a=OLD.a;
RETURN OLD;
END IF;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER rw_view1_trig
INSTEAD OF INSERT OR UPDATE OR DELETE ON rw_view1
FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn();
CREATE VIEW rw_view2 AS
SELECT * FROM rw_view1 WHERE a > 0 WITH LOCAL CHECK OPTION;
INSERT INTO rw_view2 VALUES (-5); -- should fail
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (50); -- ok, but not in view
UPDATE rw_view2 SET a = a - 10; -- should fail
SELECT * FROM base_tbl;
-- Check option won't cascade down to base view with INSTEAD OF triggers
ALTER VIEW rw_view2 SET (check_option=cascaded);
INSERT INTO rw_view2 VALUES (100); -- ok, but not in view (doesn't fail rw_view1's check)
UPDATE rw_view2 SET a = 200 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view1's check)
SELECT * FROM base_tbl;
-- Neither local nor cascaded check options work with INSTEAD rules
DROP TRIGGER rw_view1_trig ON rw_view1;
CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, 10);
CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1
DO INSTEAD UPDATE base_tbl SET a=NEW.a WHERE a=OLD.a;
INSERT INTO rw_view2 VALUES (-10); -- ok, but not in view (doesn't fail rw_view2's check)
INSERT INTO rw_view2 VALUES (5); -- ok
INSERT INTO rw_view2 VALUES (20); -- ok, but not in view (doesn't fail rw_view1's check)
UPDATE rw_view2 SET a = 30 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view1's check)
INSERT INTO rw_view2 VALUES (5); -- ok
UPDATE rw_view2 SET a = -5 WHERE a = 5; -- ok, but not in view (doesn't fail rw_view2's check)
SELECT * FROM base_tbl;
DROP TABLE base_tbl CASCADE;
DROP FUNCTION rw_view1_trig_fn();
CREATE TABLE base_tbl (a int);
CREATE VIEW rw_view1 AS SELECT a,10 AS b FROM base_tbl;
CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1
DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a);
CREATE VIEW rw_view2 AS
SELECT * FROM rw_view1 WHERE a > b WITH LOCAL CHECK OPTION;
INSERT INTO rw_view2 VALUES (2,3); -- ok, but not in view (doesn't fail rw_view2's check)
DROP TABLE base_tbl CASCADE;