Add more infinite recursion detection while locking a view.

Also add regression test cases for detecting infinite recursion in
locking view tests.  Some document enhancements. Patch by Yugo Nagata.
This commit is contained in:
Tatsuo Ishii 2018-04-17 16:59:17 +09:00
parent 47c91b5599
commit 03030512d1
4 changed files with 63 additions and 31 deletions

View File

@ -46,8 +46,8 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
</para> </para>
<para> <para>
When a view is specified to be locked, all relations appearing in the view When a view is locked, all relations appearing in the view definition
definition query are also locked recursively with the same lock mode. query are also locked recursively with the same lock mode.
</para> </para>
<para> <para>
@ -173,6 +173,13 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
or <literal>TRUNCATE</literal> privileges. or <literal>TRUNCATE</literal> privileges.
</para> </para>
<para>
The user performing the lock on the view must have the corresponding privilege
on the view. In addition the view's owner must have the relevant privileges on
the underlying base relations, but the user performing the lock does
not need any permissions on the underlying base relations.
</para>
<para> <para>
<command>LOCK TABLE</command> is useless outside a transaction block: the lock <command>LOCK TABLE</command> is useless outside a transaction block: the lock
would remain held only to the completion of the statement. Therefore would remain held only to the completion of the statement. Therefore

View File

@ -31,7 +31,7 @@ static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid use
static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid); static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg); Oid oldrelid, void *arg);
static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait); static void LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views);
/* /*
* LOCK TABLE * LOCK TABLE
@ -67,7 +67,7 @@ LockTableCommand(LockStmt *lockstmt)
(void *) &lockstmt->mode); (void *) &lockstmt->mode);
if (get_rel_relkind(reloid) == RELKIND_VIEW) if (get_rel_relkind(reloid) == RELKIND_VIEW)
LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait); LockViewRecurse(reloid, lockstmt->mode, lockstmt->nowait, NIL);
else if (recurse) else if (recurse)
LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId()); LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
} }
@ -92,7 +92,6 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
return; /* woops, concurrently dropped; no permissions return; /* woops, concurrently dropped; no permissions
* check */ * check */
/* Currently, we only allow plain tables or views to be locked */ /* Currently, we only allow plain tables or views to be locked */
if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
relkind != RELKIND_VIEW) relkind != RELKIND_VIEW)
@ -178,11 +177,11 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
typedef struct typedef struct
{ {
Oid root_reloid; LOCKMODE lockmode; /* lock mode to use */
LOCKMODE lockmode; bool nowait; /* no wait mode */
bool nowait; Oid viewowner; /* view owner for checking the privilege */
Oid viewowner; Oid viewoid; /* OID of the view to be locked */
Oid viewoid; List *ancestor_views; /* OIDs of ancestor views */
} LockViewRecurse_context; } LockViewRecurse_context;
static bool static bool
@ -193,19 +192,22 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
if (IsA(node, Query)) if (IsA(node, Query))
{ {
Query *query = (Query *) node; Query *query = (Query *) node;
ListCell *rtable; ListCell *rtable;
foreach(rtable, query->rtable) foreach(rtable, query->rtable)
{ {
RangeTblEntry *rte = lfirst(rtable); RangeTblEntry *rte = lfirst(rtable);
AclResult aclresult; AclResult aclresult;
Oid relid = rte->relid; Oid relid = rte->relid;
char relkind = rte->relkind; char relkind = rte->relkind;
char *relname = get_rel_name(relid); char *relname = get_rel_name(relid);
/* The OLD and NEW placeholder entries in the view's rtable are skipped. */ /*
* The OLD and NEW placeholder entries in the view's rtable are
* skipped.
*/
if (relid == context->viewoid && if (relid == context->viewoid &&
(!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new"))) (!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new")))
continue; continue;
@ -216,11 +218,11 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
continue; continue;
/* Check infinite recursion in the view definition. */ /* Check infinite recursion in the view definition. */
if (relid == context->root_reloid) if (list_member_oid(context->ancestor_views, relid))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION), (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("infinite recursion detected in rules for relation \"%s\"", errmsg("infinite recursion detected in rules for relation \"%s\"",
get_rel_name(context->root_reloid)))); get_rel_name(relid))));
/* Check permissions with the view owner's privilege. */ /* Check permissions with the view owner's privilege. */
aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner); aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
@ -233,11 +235,11 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
else if (!ConditionalLockRelationOid(relid, context->lockmode)) else if (!ConditionalLockRelationOid(relid, context->lockmode))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE), (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"", errmsg("could not obtain lock on relation \"%s\"",
relname))); relname)));
if (relkind == RELKIND_VIEW) if (relkind == RELKIND_VIEW)
LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait); LockViewRecurse(relid, context->lockmode, context->nowait, context->ancestor_views);
else if (rte->inh) else if (rte->inh)
LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner); LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
} }
@ -254,24 +256,26 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
} }
static void static void
LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait) LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views)
{ {
LockViewRecurse_context context; LockViewRecurse_context context;
Relation view; Relation view;
Query *viewquery; Query *viewquery;
view = heap_open(reloid, NoLock); view = heap_open(reloid, NoLock);
viewquery = get_view_query(view); viewquery = get_view_query(view);
context.root_reloid = root_reloid;
context.lockmode = lockmode; context.lockmode = lockmode;
context.nowait = nowait; context.nowait = nowait;
context.viewowner = view->rd_rel->relowner; context.viewowner = view->rd_rel->relowner;
context.viewoid = reloid; context.viewoid = reloid;
context.ancestor_views = lcons_oid(reloid, ancestor_views);
LockViewRecurse_walker((Node *) viewquery, &context); LockViewRecurse_walker((Node *) viewquery, &context);
ancestor_views = list_delete_oid(ancestor_views, reloid);
heap_close(view, NoLock); heap_close(view, NoLock);
} }

View File

@ -120,6 +120,17 @@ select relname from pg_locks l, pg_class c
lock_view6 lock_view6
(2 rows) (2 rows)
ROLLBACK;
-- detecting infinite recursions in view definitions
CREATE OR REPLACE VIEW lock_view2 AS SELECT * from lock_view3;
BEGIN TRANSACTION;
LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
ERROR: infinite recursion detected in rules for relation "lock_view2"
ROLLBACK;
CREATE VIEW lock_view7 AS SELECT * from lock_view2;
BEGIN TRANSACTION;
LOCK TABLE lock_view7 IN EXCLUSIVE MODE;
ERROR: infinite recursion detected in rules for relation "lock_view2"
ROLLBACK; ROLLBACK;
-- Verify that we can lock a table with inheritance children. -- Verify that we can lock a table with inheritance children.
CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1); CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
@ -142,11 +153,12 @@ RESET ROLE;
-- --
-- Clean up -- Clean up
-- --
DROP VIEW lock_view7;
DROP VIEW lock_view6; DROP VIEW lock_view6;
DROP VIEW lock_view5; DROP VIEW lock_view5;
DROP VIEW lock_view4; DROP VIEW lock_view4;
DROP VIEW lock_view3; DROP VIEW lock_view3 CASCADE;
DROP VIEW lock_view2; NOTICE: drop cascades to view lock_view2
DROP VIEW lock_view1; DROP VIEW lock_view1;
DROP TABLE lock_tbl3; DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2; DROP TABLE lock_tbl2;

View File

@ -84,6 +84,15 @@ select relname from pg_locks l, pg_class c
where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock' where l.relation = c.oid and relname like '%lock_%' and mode = 'ExclusiveLock'
order by relname; order by relname;
ROLLBACK; ROLLBACK;
-- detecting infinite recursions in view definitions
CREATE OR REPLACE VIEW lock_view2 AS SELECT * from lock_view3;
BEGIN TRANSACTION;
LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
ROLLBACK;
CREATE VIEW lock_view7 AS SELECT * from lock_view2;
BEGIN TRANSACTION;
LOCK TABLE lock_view7 IN EXCLUSIVE MODE;
ROLLBACK;
-- Verify that we can lock a table with inheritance children. -- Verify that we can lock a table with inheritance children.
CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1); CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
@ -107,11 +116,11 @@ RESET ROLE;
-- --
-- Clean up -- Clean up
-- --
DROP VIEW lock_view7;
DROP VIEW lock_view6; DROP VIEW lock_view6;
DROP VIEW lock_view5; DROP VIEW lock_view5;
DROP VIEW lock_view4; DROP VIEW lock_view4;
DROP VIEW lock_view3; DROP VIEW lock_view3 CASCADE;
DROP VIEW lock_view2;
DROP VIEW lock_view1; DROP VIEW lock_view1;
DROP TABLE lock_tbl3; DROP TABLE lock_tbl3;
DROP TABLE lock_tbl2; DROP TABLE lock_tbl2;