From 1575fbcb795fc331f46588b4520c4bca7e854d5c Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 16 Jan 2012 09:34:21 -0500 Subject: [PATCH] Prevent adding relations to a concurrently dropped schema. In the previous coding, it was possible for a relation to be created via CREATE TABLE, CREATE VIEW, CREATE SEQUENCE, CREATE FOREIGN TABLE, etc. in a schema while that schema was meanwhile being concurrently dropped. This led to a pg_class entry with an invalid relnamespace value. The same problem could occur if a relation was moved using ALTER .. SET SCHEMA while the target schema was being concurrently dropped. This patch prevents both of those scenarios by locking the schema to which the relation is being added using AccessShareLock, which conflicts with the AccessExclusiveLock taken by DROP. As a desirable side effect, this also prevents the use of CREATE OR REPLACE VIEW to queue for an AccessExclusiveLock on a relation on which you have no rights: that will now fail immediately with a permissions error, before trying to obtain a lock. We need similar protection for all other object types, but as everything other than relations uses a slightly different set of code paths, I'm leaving that for a separate commit. Original complaint (as far as I could find) about CREATE by Nikhil Sontakke; risk for ALTER .. SET SCHEMA pointed out by Tom Lane; further details by Dan Farina; patch by me; review by Hitoshi Harada. --- src/backend/catalog/namespace.c | 124 ++++++++++++++++++++++++++--- src/backend/commands/tablecmds.c | 14 ++-- src/backend/commands/typecmds.c | 3 +- src/backend/commands/view.c | 36 ++++----- src/backend/executor/execMain.c | 8 +- src/backend/parser/parse_utilcmd.c | 30 +++---- src/include/catalog/namespace.h | 4 +- 7 files changed, 159 insertions(+), 60 deletions(-) diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index a9a64fe5f2..80d6fc7d0c 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) /* * RangeVarGetAndCheckCreationNamespace - * As RangeVarGetCreationNamespace, but with a permissions check. + * + * This function returns the OID of the namespace in which a new relation + * with a given name should be created. If the user does not have CREATE + * permission on the target namespace, this function will instead signal + * an ERROR. + * + * If non-NULL, *existing_oid is set to the OID of any existing relation with + * the same name which already exists in that namespace, or to InvalidOid if + * no such relation exists. + * + * If lockmode != NoLock, the specified lock mode is acquire on the existing + * relation, if any, provided that the current user owns the target relation. + * However, if lockmode != NoLock and the user does not own the target + * relation, we throw an ERROR, as we must not try to lock relations the + * user does not have permissions on. + * + * As a side effect, this function acquires AccessShareLock on the target + * namespace. Without this, the namespace could be dropped before our + * transaction commits, leaving behind relations with relnamespace pointing + * to a no-longer-exstant namespace. + * + * As a further side-effect, if the select namespace is a temporary namespace, + * we mark the RangeVar as RELPERSISTENCE_TEMP. */ Oid -RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation) +RangeVarGetAndCheckCreationNamespace(RangeVar *relation, + LOCKMODE lockmode, + Oid *existing_relation_id) { - Oid namespaceId; - - namespaceId = RangeVarGetCreationNamespace(newRelation); + uint64 inval_count; + Oid relid; + Oid oldrelid = InvalidOid; + Oid nspid; + Oid oldnspid = InvalidOid; + bool retry = false; /* - * Check we have permission to create there. Skip check if bootstrapping, - * since permissions machinery may not be working yet. + * We check the catalog name and then ignore it. */ - if (!IsBootstrapProcessingMode()) + if (relation->catalogname) + { + if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cross-database references are not implemented: \"%s.%s.%s\"", + relation->catalogname, relation->schemaname, + relation->relname))); + } + + /* + * As in RangeVarGetRelidExtended(), we guard against concurrent DDL + * operations by tracking whether any invalidation messages are processed + * while we're doing the name lookups and acquiring locks. See comments + * in that function for a more detailed explanation of this logic. + */ + for (;;) { AclResult aclresult; - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); + inval_count = SharedInvalidMessageCounter; + + /* Look up creation namespace and check for existing relation. */ + nspid = RangeVarGetCreationNamespace(relation); + Assert(OidIsValid(nspid)); + if (existing_relation_id != NULL) + relid = get_relname_relid(relation->relname, nspid); + else + relid = InvalidOid; + + /* + * In bootstrap processing mode, we don't bother with permissions + * or locking. Permissions might not be working yet, and locking is + * unnecessary. + */ + if (IsBootstrapProcessingMode()) + break; + + /* Check namespace permissions. */ + aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); + get_namespace_name(nspid)); + + if (retry) + { + /* If nothing changed, we're done. */ + if (relid == oldrelid && nspid == oldnspid) + break; + /* If creation namespace has changed, give up old lock. */ + if (nspid != oldnspid) + UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0, + AccessShareLock); + /* If name points to something different, give up old lock. */ + if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock) + UnlockRelationOid(oldrelid, lockmode); + } + + /* Lock namespace. */ + if (nspid != oldnspid) + LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock); + + /* Lock relation, if required if and we have permission. */ + if (lockmode != NoLock && OidIsValid(relid)) + { + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + relation->relname); + if (relid != oldrelid) + LockRelationOid(relid, lockmode); + } + + /* If no invalidation message were processed, we're done! */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* Something may have changed, so recheck our work. */ + retry = true; + oldrelid = relid; + oldnspid = nspid; } - return namespaceId; + RangeVarAdjustRelationPersistence(relation, nspid); + if (existing_relation_id != NULL) + *existing_relation_id = relid; + return nspid; } /* diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c37301671e..d0843b2f58 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -451,10 +451,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) /* * Look up the namespace in which we are supposed to create the relation, - * and check we have permission to create there. + * check we have permission to create there, lock it against concurrent + * drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary + * namespace is selected. */ - namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation); - RangeVarAdjustRelationPersistence(stmt->relation, namespaceId); + namespaceId = + RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL); /* * Security check: disallow creating temp tables from security-restricted @@ -9417,6 +9419,7 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt) Oid oldNspOid; Oid nspOid; Relation classRel; + RangeVar *newrv; relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, false, false, @@ -9441,8 +9444,9 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt) get_rel_name(tableId)))); } - /* get schema OID and check its permissions */ - nspOid = LookupCreationNamespace(stmt->newschema); + /* Get and lock schema OID and check its permissions. */ + newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); + nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid); diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 0f8af31fee..0043bf1fee 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -2005,7 +2005,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) * check is here mainly to get a better error message about a "type" * instead of below about a "relation". */ - typeNamespace = RangeVarGetCreationNamespace(createStmt->relation); + typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation, + NoLock, NULL); RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace); old_type_oid = GetSysCacheOid2(TYPENAMENSP, diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c index ff9c44908a..c3520ae03c 100644 --- a/src/backend/commands/view.c +++ b/src/backend/commands/view.c @@ -98,10 +98,12 @@ isViewOnTempTable_walker(Node *node, void *context) *--------------------------------------------------------------------- */ static Oid -DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, - Oid namespaceId, List *options) +DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace, + List *options) { Oid viewOid; + Oid namespaceId; + LOCKMODE lockmode; CreateStmt *createStmt = makeNode(CreateStmt); List *attrList; ListCell *t; @@ -159,9 +161,14 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, errmsg("view must have at least one column"))); /* - * Check to see if we want to replace an existing view. + * Look up, check permissions on, and lock the creation namespace; also + * check for a preexisting view with the same name. This will also set + * relation->relpersistence to RELPERSISTENCE_TEMP if the selected + * namespace is temporary. */ - viewOid = get_relname_relid(relation->relname, namespaceId); + lockmode = replace ? AccessExclusiveLock : NoLock; + namespaceId = + RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid); if (OidIsValid(viewOid) && replace) { @@ -170,24 +177,16 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, List *atcmds = NIL; AlterTableCmd *atcmd; - /* - * Yes. Get exclusive lock on the existing view ... - */ - rel = relation_open(viewOid, AccessExclusiveLock); + /* Relation is already locked, but we must build a relcache entry. */ + rel = relation_open(viewOid, NoLock); - /* - * Make sure it *is* a view, and do permissions checks. - */ + /* Make sure it *is* a view. */ if (rel->rd_rel->relkind != RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a view", RelationGetRelationName(rel)))); - if (!pg_class_ownercheck(viewOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - /* Also check it's not in use already */ CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW"); @@ -428,7 +427,6 @@ DefineView(ViewStmt *stmt, const char *queryString) { Query *viewParse; Oid viewOid; - Oid namespaceId; RangeVar *view; /* @@ -514,10 +512,6 @@ DefineView(ViewStmt *stmt, const char *queryString) view->relname))); } - /* Might also need to make it temporary if placed in temp schema. */ - namespaceId = RangeVarGetCreationNamespace(view); - RangeVarAdjustRelationPersistence(view, namespaceId); - /* * Create the view relation * @@ -525,7 +519,7 @@ DefineView(ViewStmt *stmt, const char *queryString) * aborted. */ viewOid = DefineVirtualRelation(view, viewParse->targetList, - stmt->replace, namespaceId, stmt->options); + stmt->replace, stmt->options); /* * The relation we have just created is not visible to any other commands diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 569d0ba7ad..422f737e82 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -2532,11 +2532,13 @@ OpenIntoRel(QueryDesc *queryDesc) } /* - * Find namespace to create in, check its permissions + * Find namespace to create in, check its permissions, lock it against + * concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the + * selected namespace is temporary. */ intoName = into->rel->relname; - namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel); - RangeVarAdjustRelationPersistence(into->rel, namespaceId); + namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock, + NULL); /* * Security check: disallow creating temp tables from security-restricted diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 335bdc6b07..99157c5349 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -146,6 +146,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) List *save_alist; ListCell *elements; Oid namespaceid; + Oid existing_relid; /* * We must not scribble on the passed-in CreateStmt, so copy it. (This is @@ -155,30 +156,25 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) /* * Look up the creation namespace. This also checks permissions on the - * target namespace, so that we throw any permissions error as early as - * possible. + * target namespace, locks it against concurrent drops, checks for a + * preexisting relation in that namespace with the same name, and updates + * stmt->relation->relpersistence if the select namespace is temporary. */ - namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation); - RangeVarAdjustRelationPersistence(stmt->relation, namespaceid); + namespaceid = + RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, + &existing_relid); /* * If the relation already exists and the user specified "IF NOT EXISTS", * bail out with a NOTICE. */ - if (stmt->if_not_exists) + if (stmt->if_not_exists && OidIsValid(existing_relid)) { - Oid existing_relid; - - existing_relid = get_relname_relid(stmt->relation->relname, - namespaceid); - if (existing_relid != InvalidOid) - { - ereport(NOTICE, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists, skipping", - stmt->relation->relname))); - return NIL; - } + ereport(NOTICE, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s\" already exists, skipping", + stmt->relation->relname))); + return NIL; } /* diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 37b259dc9f..fa3ba5bd10 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -58,7 +58,9 @@ extern Oid RangeVarGetRelidExtended(const RangeVar *relation, RangeVarGetRelidCallback callback, void *callback_arg); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); -extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); +extern Oid RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation, + LOCKMODE lockmode, + Oid *existing_relation_id); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); extern Oid RelnameGetRelid(const char *relname); extern bool RelationIsVisible(Oid relid);