Prevent adding relations to a concurrently dropped schema.

In the previous coding, it was possible for a relation to be created
via CREATE TABLE, CREATE VIEW, CREATE SEQUENCE, CREATE FOREIGN TABLE,
etc.  in a schema while that schema was meanwhile being concurrently
dropped.  This led to a pg_class entry with an invalid relnamespace
value.  The same problem could occur if a relation was moved using
ALTER .. SET SCHEMA while the target schema was being concurrently
dropped.  This patch prevents both of those scenarios by locking the
schema to which the relation is being added using AccessShareLock,
which conflicts with the AccessExclusiveLock taken by DROP.

As a desirable side effect, this also prevents the use of CREATE OR
REPLACE VIEW to queue for an AccessExclusiveLock on a relation on which
you have no rights: that will now fail immediately with a permissions
error, before trying to obtain a lock.

We need similar protection for all other object types, but as everything
other than relations uses a slightly different set of code paths, I'm
leaving that for a separate commit.

Original complaint (as far as I could find) about CREATE by Nikhil
Sontakke; risk for ALTER .. SET SCHEMA pointed out by Tom Lane;
further details by Dan Farina; patch by me; review by Hitoshi Harada.
This commit is contained in:
Robert Haas 2012-01-16 09:34:21 -05:00
parent 01d83ffdca
commit 1575fbcb79
7 changed files with 159 additions and 60 deletions

View File

@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
/*
* RangeVarGetAndCheckCreationNamespace
* As RangeVarGetCreationNamespace, but with a permissions check.
*
* This function returns the OID of the namespace in which a new relation
* with a given name should be created. If the user does not have CREATE
* permission on the target namespace, this function will instead signal
* an ERROR.
*
* If non-NULL, *existing_oid is set to the OID of any existing relation with
* the same name which already exists in that namespace, or to InvalidOid if
* no such relation exists.
*
* If lockmode != NoLock, the specified lock mode is acquire on the existing
* relation, if any, provided that the current user owns the target relation.
* However, if lockmode != NoLock and the user does not own the target
* relation, we throw an ERROR, as we must not try to lock relations the
* user does not have permissions on.
*
* As a side effect, this function acquires AccessShareLock on the target
* namespace. Without this, the namespace could be dropped before our
* transaction commits, leaving behind relations with relnamespace pointing
* to a no-longer-exstant namespace.
*
* As a further side-effect, if the select namespace is a temporary namespace,
* we mark the RangeVar as RELPERSISTENCE_TEMP.
*/
Oid
RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
RangeVarGetAndCheckCreationNamespace(RangeVar *relation,
LOCKMODE lockmode,
Oid *existing_relation_id)
{
Oid namespaceId;
namespaceId = RangeVarGetCreationNamespace(newRelation);
uint64 inval_count;
Oid relid;
Oid oldrelid = InvalidOid;
Oid nspid;
Oid oldnspid = InvalidOid;
bool retry = false;
/*
* Check we have permission to create there. Skip check if bootstrapping,
* since permissions machinery may not be working yet.
* We check the catalog name and then ignore it.
*/
if (!IsBootstrapProcessingMode())
if (relation->catalogname)
{
if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
relation->catalogname, relation->schemaname,
relation->relname)));
}
/*
* As in RangeVarGetRelidExtended(), we guard against concurrent DDL
* operations by tracking whether any invalidation messages are processed
* while we're doing the name lookups and acquiring locks. See comments
* in that function for a more detailed explanation of this logic.
*/
for (;;)
{
AclResult aclresult;
aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
ACL_CREATE);
inval_count = SharedInvalidMessageCounter;
/* Look up creation namespace and check for existing relation. */
nspid = RangeVarGetCreationNamespace(relation);
Assert(OidIsValid(nspid));
if (existing_relation_id != NULL)
relid = get_relname_relid(relation->relname, nspid);
else
relid = InvalidOid;
/*
* In bootstrap processing mode, we don't bother with permissions
* or locking. Permissions might not be working yet, and locking is
* unnecessary.
*/
if (IsBootstrapProcessingMode())
break;
/* Check namespace permissions. */
aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceId));
get_namespace_name(nspid));
if (retry)
{
/* If nothing changed, we're done. */
if (relid == oldrelid && nspid == oldnspid)
break;
/* If creation namespace has changed, give up old lock. */
if (nspid != oldnspid)
UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0,
AccessShareLock);
/* If name points to something different, give up old lock. */
if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock)
UnlockRelationOid(oldrelid, lockmode);
}
/* Lock namespace. */
if (nspid != oldnspid)
LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock);
/* Lock relation, if required if and we have permission. */
if (lockmode != NoLock && OidIsValid(relid))
{
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
relation->relname);
if (relid != oldrelid)
LockRelationOid(relid, lockmode);
}
/* If no invalidation message were processed, we're done! */
if (inval_count == SharedInvalidMessageCounter)
break;
/* Something may have changed, so recheck our work. */
retry = true;
oldrelid = relid;
oldnspid = nspid;
}
return namespaceId;
RangeVarAdjustRelationPersistence(relation, nspid);
if (existing_relation_id != NULL)
*existing_relation_id = relid;
return nspid;
}
/*

View File

@ -451,10 +451,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
/*
* Look up the namespace in which we are supposed to create the relation,
* and check we have permission to create there.
* check we have permission to create there, lock it against concurrent
* drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary
* namespace is selected.
*/
namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
RangeVarAdjustRelationPersistence(stmt->relation, namespaceId);
namespaceId =
RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL);
/*
* Security check: disallow creating temp tables from security-restricted
@ -9417,6 +9419,7 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
Oid oldNspOid;
Oid nspOid;
Relation classRel;
RangeVar *newrv;
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
@ -9441,8 +9444,9 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
get_rel_name(tableId))));
}
/* get schema OID and check its permissions */
nspOid = LookupCreationNamespace(stmt->newschema);
/* Get and lock schema OID and check its permissions. */
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);

View File

@ -2005,7 +2005,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist)
* check is here mainly to get a better error message about a "type"
* instead of below about a "relation".
*/
typeNamespace = RangeVarGetCreationNamespace(createStmt->relation);
typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation,
NoLock, NULL);
RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace);
old_type_oid =
GetSysCacheOid2(TYPENAMENSP,

View File

@ -98,10 +98,12 @@ isViewOnTempTable_walker(Node *node, void *context)
*---------------------------------------------------------------------
*/
static Oid
DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
Oid namespaceId, List *options)
DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace,
List *options)
{
Oid viewOid;
Oid namespaceId;
LOCKMODE lockmode;
CreateStmt *createStmt = makeNode(CreateStmt);
List *attrList;
ListCell *t;
@ -159,9 +161,14 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
errmsg("view must have at least one column")));
/*
* Check to see if we want to replace an existing view.
* Look up, check permissions on, and lock the creation namespace; also
* check for a preexisting view with the same name. This will also set
* relation->relpersistence to RELPERSISTENCE_TEMP if the selected
* namespace is temporary.
*/
viewOid = get_relname_relid(relation->relname, namespaceId);
lockmode = replace ? AccessExclusiveLock : NoLock;
namespaceId =
RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid);
if (OidIsValid(viewOid) && replace)
{
@ -170,24 +177,16 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace,
List *atcmds = NIL;
AlterTableCmd *atcmd;
/*
* Yes. Get exclusive lock on the existing view ...
*/
rel = relation_open(viewOid, AccessExclusiveLock);
/* Relation is already locked, but we must build a relcache entry. */
rel = relation_open(viewOid, NoLock);
/*
* Make sure it *is* a view, and do permissions checks.
*/
/* Make sure it *is* a view. */
if (rel->rd_rel->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(rel))));
if (!pg_class_ownercheck(viewOid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(rel));
/* Also check it's not in use already */
CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW");
@ -428,7 +427,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
{
Query *viewParse;
Oid viewOid;
Oid namespaceId;
RangeVar *view;
/*
@ -514,10 +512,6 @@ DefineView(ViewStmt *stmt, const char *queryString)
view->relname)));
}
/* Might also need to make it temporary if placed in temp schema. */
namespaceId = RangeVarGetCreationNamespace(view);
RangeVarAdjustRelationPersistence(view, namespaceId);
/*
* Create the view relation
*
@ -525,7 +519,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
* aborted.
*/
viewOid = DefineVirtualRelation(view, viewParse->targetList,
stmt->replace, namespaceId, stmt->options);
stmt->replace, stmt->options);
/*
* The relation we have just created is not visible to any other commands

View File

@ -2532,11 +2532,13 @@ OpenIntoRel(QueryDesc *queryDesc)
}
/*
* Find namespace to create in, check its permissions
* Find namespace to create in, check its permissions, lock it against
* concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the
* selected namespace is temporary.
*/
intoName = into->rel->relname;
namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
RangeVarAdjustRelationPersistence(into->rel, namespaceId);
namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock,
NULL);
/*
* Security check: disallow creating temp tables from security-restricted

View File

@ -146,6 +146,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
List *save_alist;
ListCell *elements;
Oid namespaceid;
Oid existing_relid;
/*
* We must not scribble on the passed-in CreateStmt, so copy it. (This is
@ -155,30 +156,25 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
/*
* Look up the creation namespace. This also checks permissions on the
* target namespace, so that we throw any permissions error as early as
* possible.
* target namespace, locks it against concurrent drops, checks for a
* preexisting relation in that namespace with the same name, and updates
* stmt->relation->relpersistence if the select namespace is temporary.
*/
namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
RangeVarAdjustRelationPersistence(stmt->relation, namespaceid);
namespaceid =
RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock,
&existing_relid);
/*
* If the relation already exists and the user specified "IF NOT EXISTS",
* bail out with a NOTICE.
*/
if (stmt->if_not_exists)
if (stmt->if_not_exists && OidIsValid(existing_relid))
{
Oid existing_relid;
existing_relid = get_relname_relid(stmt->relation->relname,
namespaceid);
if (existing_relid != InvalidOid)
{
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
stmt->relation->relname)));
return NIL;
}
ereport(NOTICE,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists, skipping",
stmt->relation->relname)));
return NIL;
}
/*

View File

@ -58,7 +58,9 @@ extern Oid RangeVarGetRelidExtended(const RangeVar *relation,
RangeVarGetRelidCallback callback,
void *callback_arg);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation,
LOCKMODE lockmode,
Oid *existing_relation_id);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
extern Oid RelnameGetRelid(const char *relname);
extern bool RelationIsVisible(Oid relid);