From b59c03f84206ed3f3f864dcbe337ec04b2a8558d Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 9 Jul 2000 04:56:32 +0000 Subject: [PATCH] Make view/rule permission checking behave properly with subqueries in the rule. --- src/backend/rewrite/locks.c | 182 +++++++++++++++++++++++++----------- 1 file changed, 126 insertions(+), 56 deletions(-) diff --git a/src/backend/rewrite/locks.c b/src/backend/rewrite/locks.c index 57cce96d90..5054b21543 100644 --- a/src/backend/rewrite/locks.c +++ b/src/backend/rewrite/locks.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.29 2000/05/30 00:49:51 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.30 2000/07/09 04:56:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,9 +17,9 @@ #include "catalog/pg_shadow.h" #include "optimizer/clauses.h" #include "rewrite/locks.h" +#include "parser/parsetree.h" #include "utils/acl.h" #include "utils/syscache.h" -#include "utils/syscache.h" /* @@ -152,39 +152,148 @@ matchLocks(CmdType event, } +/* + * Check the access permissions of tables that are referred to by a rule. + * We want to check the access permissions using the userid of the rule's + * owner, *not* of the current user (the one accessing the rule). So, we + * do the permission check here and set skipAcl = TRUE in each of the rule's + * RTEs, to prevent the executor from running another check with the current + * user's ID. + * + * XXX This routine is called before the rule's query tree has been copied + * out of the relcache entry where it is kept. Therefore, when we set + * skipAcl = TRUE, we are destructively modifying the relcache entry for + * the event relation! This seems fairly harmless because the relcache + * querytree is only used as a source for the rewriter, but it's a tad + * unclean anyway. + * + * Note that we must check permissions every time, even if skipAcl was + * already set TRUE by a prior call. This ensures that we enforce the + * current permission settings for each referenced table, even if they + * have changed since the relcache entry was loaded. + */ + +typedef struct +{ + char *evowner; +} checkLockPerms_context; + +static bool +checkLockPerms_walker(Node *node, + checkLockPerms_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, SubLink)) + { + /* + * Standard expression_tree_walker will not recurse into + * subselect, but here we must do so. + */ + SubLink *sub = (SubLink *) node; + + if (checkLockPerms_walker((Node *) (sub->lefthand), context)) + return true; + if (checkLockPerms_walker((Node *) (sub->subselect), context)) + return true; + return false; + } + if (IsA(node, Query)) + { + /* Reach here after recursing down into subselect above... */ + Query *qry = (Query *) node; + int rtablength = length(qry->rtable); + int i; + + /* Check all the RTEs in this query node, except OLD and NEW */ + for (i = 1; i <= rtablength; i++) + { + RangeTblEntry *rte = rt_fetch(i, qry->rtable); + int32 reqperm; + int32 aclcheck_res; + + if (rte->ref != NULL) + { + if (strcmp(rte->ref->relname, "*NEW*") == 0) + continue; + if (strcmp(rte->ref->relname, "*OLD*") == 0) + continue; + } + + if (i == qry->resultRelation) + switch (qry->commandType) + { + case CMD_INSERT: + reqperm = ACL_AP; + break; + default: + reqperm = ACL_WR; + break; + } + else + reqperm = ACL_RD; + + aclcheck_res = pg_aclcheck(rte->relname, + context->evowner, + reqperm); + if (aclcheck_res != ACLCHECK_OK) + elog(ERROR, "%s: %s", + rte->relname, + aclcheck_error_strings[aclcheck_res]); + + /* + * Mark RTE to prevent executor from checking again with the + * current user's ID... + */ + rte->skipAcl = true; + } + + /* If there are sublinks, search for them and check their RTEs */ + if (qry->hasSubLinks) + { + if (checkLockPerms_walker((Node *) (qry->targetList), context)) + return true; + if (checkLockPerms_walker((Node *) (qry->qual), context)) + return true; + if (checkLockPerms_walker((Node *) (qry->havingQual), context)) + return true; + } + return false; + } + return expression_tree_walker(node, checkLockPerms_walker, + (void *) context); +} + void checkLockPerms(List *locks, Query *parsetree, int rt_index) { + RangeTblEntry *rte; Relation ev_rel; HeapTuple usertup; - char *evowner; - RangeTblEntry *rte; - int32 reqperm; - int32 aclcheck_res; - int i; + Form_pg_shadow userform; + checkLockPerms_context context; List *l; if (locks == NIL) - return; + return; /* nothing to check */ /* - * Get the usename of the rules event relation owner + * Get the usename of the rule's event relation owner */ - rte = (RangeTblEntry *) nth(rt_index - 1, parsetree->rtable); + rte = rt_fetch(rt_index, parsetree->rtable); ev_rel = heap_openr(rte->relname, AccessShareLock); usertup = SearchSysCacheTuple(SHADOWSYSID, ObjectIdGetDatum(ev_rel->rd_rel->relowner), 0, 0, 0); if (!HeapTupleIsValid(usertup)) - { elog(ERROR, "cache lookup for userid %d failed", ev_rel->rd_rel->relowner); - } + userform = (Form_pg_shadow) GETSTRUCT(usertup); + context.evowner = pstrdup(NameStr(userform->usename)); heap_close(ev_rel, AccessShareLock); - evowner = pstrdup(NameStr(((Form_pg_shadow) GETSTRUCT(usertup))->usename)); /* - * Check all the locks, that should get fired on this query + * Check all the locks that should get fired on this query */ foreach(l, locks) { @@ -192,53 +301,14 @@ checkLockPerms(List *locks, Query *parsetree, int rt_index) List *action; /* - * In each lock check every action + * In each lock check every action. We must scan the action + * recursively in case there are any sub-queries within it. */ foreach(action, onelock->actions) { Query *query = (Query *) lfirst(action); - /* - * In each action check every rangetable entry for read/write - * permission of the event relations owner depending on if - * it's the result relation (write) or not (read) - */ - for (i = 2; i < length(query->rtable); i++) - { - if (i + 1 == query->resultRelation) - switch (query->resultRelation) - { - case CMD_INSERT: - reqperm = ACL_AP; - break; - default: - reqperm = ACL_WR; - break; - } - else - reqperm = ACL_RD; - - rte = (RangeTblEntry *) nth(i, query->rtable); - aclcheck_res = pg_aclcheck(rte->relname, - evowner, reqperm); - if (aclcheck_res != ACLCHECK_OK) - { - elog(ERROR, "%s: %s", - rte->relname, - aclcheck_error_strings[aclcheck_res]); - } - - /* - * So this is allowed due to the permissions of the rules - * event relation owner. But let's see if the next one too - */ - rte->skipAcl = TRUE; - } + checkLockPerms_walker((Node *) query, &context); } } - - /* - * Phew, that was close - */ - return; }