diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index a9a64fe5f2..80d6fc7d0c 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) /* * RangeVarGetAndCheckCreationNamespace - * As RangeVarGetCreationNamespace, but with a permissions check. + * + * This function returns the OID of the namespace in which a new relation + * with a given name should be created. If the user does not have CREATE + * permission on the target namespace, this function will instead signal + * an ERROR. + * + * If non-NULL, *existing_oid is set to the OID of any existing relation with + * the same name which already exists in that namespace, or to InvalidOid if + * no such relation exists. + * + * If lockmode != NoLock, the specified lock mode is acquire on the existing + * relation, if any, provided that the current user owns the target relation. + * However, if lockmode != NoLock and the user does not own the target + * relation, we throw an ERROR, as we must not try to lock relations the + * user does not have permissions on. + * + * As a side effect, this function acquires AccessShareLock on the target + * namespace. Without this, the namespace could be dropped before our + * transaction commits, leaving behind relations with relnamespace pointing + * to a no-longer-exstant namespace. + * + * As a further side-effect, if the select namespace is a temporary namespace, + * we mark the RangeVar as RELPERSISTENCE_TEMP. */ Oid -RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation) +RangeVarGetAndCheckCreationNamespace(RangeVar *relation, + LOCKMODE lockmode, + Oid *existing_relation_id) { - Oid namespaceId; - - namespaceId = RangeVarGetCreationNamespace(newRelation); + uint64 inval_count; + Oid relid; + Oid oldrelid = InvalidOid; + Oid nspid; + Oid oldnspid = InvalidOid; + bool retry = false; /* - * Check we have permission to create there. Skip check if bootstrapping, - * since permissions machinery may not be working yet. + * We check the catalog name and then ignore it. */ - if (!IsBootstrapProcessingMode()) + if (relation->catalogname) + { + if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cross-database references are not implemented: \"%s.%s.%s\"", + relation->catalogname, relation->schemaname, + relation->relname))); + } + + /* + * As in RangeVarGetRelidExtended(), we guard against concurrent DDL + * operations by tracking whether any invalidation messages are processed + * while we're doing the name lookups and acquiring locks. See comments + * in that function for a more detailed explanation of this logic. + */ + for (;;) { AclResult aclresult; - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); + inval_count = SharedInvalidMessageCounter; + + /* Look up creation namespace and check for existing relation. */ + nspid = RangeVarGetCreationNamespace(relation); + Assert(OidIsValid(nspid)); + if (existing_relation_id != NULL) + relid = get_relname_relid(relation->relname, nspid); + else + relid = InvalidOid; + + /* + * In bootstrap processing mode, we don't bother with permissions + * or locking. Permissions might not be working yet, and locking is + * unnecessary. + */ + if (IsBootstrapProcessingMode()) + break; + + /* Check namespace permissions. */ + aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); + get_namespace_name(nspid)); + + if (retry) + { + /* If nothing changed, we're done. */ + if (relid == oldrelid && nspid == oldnspid) + break; + /* If creation namespace has changed, give up old lock. */ + if (nspid != oldnspid) + UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0, + AccessShareLock); + /* If name points to something different, give up old lock. */ + if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock) + UnlockRelationOid(oldrelid, lockmode); + } + + /* Lock namespace. */ + if (nspid != oldnspid) + LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock); + + /* Lock relation, if required if and we have permission. */ + if (lockmode != NoLock && OidIsValid(relid)) + { + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + relation->relname); + if (relid != oldrelid) + LockRelationOid(relid, lockmode); + } + + /* If no invalidation message were processed, we're done! */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* Something may have changed, so recheck our work. */ + retry = true; + oldrelid = relid; + oldnspid = nspid; } - return namespaceId; + RangeVarAdjustRelationPersistence(relation, nspid); + if (existing_relation_id != NULL) + *existing_relation_id = relid; + return nspid; } /* diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c37301671e..d0843b2f58 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -451,10 +451,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) /* * Look up the namespace in which we are supposed to create the relation, - * and check we have permission to create there. + * check we have permission to create there, lock it against concurrent + * drop, and mark stmt->relation as RELPERSISTENCE_TEMP if a temporary + * namespace is selected. */ - namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation); - RangeVarAdjustRelationPersistence(stmt->relation, namespaceId); + namespaceId = + RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, NULL); /* * Security check: disallow creating temp tables from security-restricted @@ -9417,6 +9419,7 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt) Oid oldNspOid; Oid nspOid; Relation classRel; + RangeVar *newrv; relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, false, false, @@ -9441,8 +9444,9 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt) get_rel_name(tableId)))); } - /* get schema OID and check its permissions */ - nspOid = LookupCreationNamespace(stmt->newschema); + /* Get and lock schema OID and check its permissions. */ + newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); + nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid); diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index 0f8af31fee..0043bf1fee 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -2005,7 +2005,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) * check is here mainly to get a better error message about a "type" * instead of below about a "relation". */ - typeNamespace = RangeVarGetCreationNamespace(createStmt->relation); + typeNamespace = RangeVarGetAndCheckCreationNamespace(createStmt->relation, + NoLock, NULL); RangeVarAdjustRelationPersistence(createStmt->relation, typeNamespace); old_type_oid = GetSysCacheOid2(TYPENAMENSP, diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c index ff9c44908a..c3520ae03c 100644 --- a/src/backend/commands/view.c +++ b/src/backend/commands/view.c @@ -98,10 +98,12 @@ isViewOnTempTable_walker(Node *node, void *context) *--------------------------------------------------------------------- */ static Oid -DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, - Oid namespaceId, List *options) +DefineVirtualRelation(RangeVar *relation, List *tlist, bool replace, + List *options) { Oid viewOid; + Oid namespaceId; + LOCKMODE lockmode; CreateStmt *createStmt = makeNode(CreateStmt); List *attrList; ListCell *t; @@ -159,9 +161,14 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, errmsg("view must have at least one column"))); /* - * Check to see if we want to replace an existing view. + * Look up, check permissions on, and lock the creation namespace; also + * check for a preexisting view with the same name. This will also set + * relation->relpersistence to RELPERSISTENCE_TEMP if the selected + * namespace is temporary. */ - viewOid = get_relname_relid(relation->relname, namespaceId); + lockmode = replace ? AccessExclusiveLock : NoLock; + namespaceId = + RangeVarGetAndCheckCreationNamespace(relation, lockmode, &viewOid); if (OidIsValid(viewOid) && replace) { @@ -170,24 +177,16 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace, List *atcmds = NIL; AlterTableCmd *atcmd; - /* - * Yes. Get exclusive lock on the existing view ... - */ - rel = relation_open(viewOid, AccessExclusiveLock); + /* Relation is already locked, but we must build a relcache entry. */ + rel = relation_open(viewOid, NoLock); - /* - * Make sure it *is* a view, and do permissions checks. - */ + /* Make sure it *is* a view. */ if (rel->rd_rel->relkind != RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a view", RelationGetRelationName(rel)))); - if (!pg_class_ownercheck(viewOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - /* Also check it's not in use already */ CheckTableNotInUse(rel, "CREATE OR REPLACE VIEW"); @@ -428,7 +427,6 @@ DefineView(ViewStmt *stmt, const char *queryString) { Query *viewParse; Oid viewOid; - Oid namespaceId; RangeVar *view; /* @@ -514,10 +512,6 @@ DefineView(ViewStmt *stmt, const char *queryString) view->relname))); } - /* Might also need to make it temporary if placed in temp schema. */ - namespaceId = RangeVarGetCreationNamespace(view); - RangeVarAdjustRelationPersistence(view, namespaceId); - /* * Create the view relation * @@ -525,7 +519,7 @@ DefineView(ViewStmt *stmt, const char *queryString) * aborted. */ viewOid = DefineVirtualRelation(view, viewParse->targetList, - stmt->replace, namespaceId, stmt->options); + stmt->replace, stmt->options); /* * The relation we have just created is not visible to any other commands diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 569d0ba7ad..422f737e82 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -2532,11 +2532,13 @@ OpenIntoRel(QueryDesc *queryDesc) } /* - * Find namespace to create in, check its permissions + * Find namespace to create in, check its permissions, lock it against + * concurrent drop, and mark into->rel as RELPERSISTENCE_TEMP if the + * selected namespace is temporary. */ intoName = into->rel->relname; - namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel); - RangeVarAdjustRelationPersistence(into->rel, namespaceId); + namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel, NoLock, + NULL); /* * Security check: disallow creating temp tables from security-restricted diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 335bdc6b07..99157c5349 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -146,6 +146,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) List *save_alist; ListCell *elements; Oid namespaceid; + Oid existing_relid; /* * We must not scribble on the passed-in CreateStmt, so copy it. (This is @@ -155,30 +156,25 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) /* * Look up the creation namespace. This also checks permissions on the - * target namespace, so that we throw any permissions error as early as - * possible. + * target namespace, locks it against concurrent drops, checks for a + * preexisting relation in that namespace with the same name, and updates + * stmt->relation->relpersistence if the select namespace is temporary. */ - namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation); - RangeVarAdjustRelationPersistence(stmt->relation, namespaceid); + namespaceid = + RangeVarGetAndCheckCreationNamespace(stmt->relation, NoLock, + &existing_relid); /* * If the relation already exists and the user specified "IF NOT EXISTS", * bail out with a NOTICE. */ - if (stmt->if_not_exists) + if (stmt->if_not_exists && OidIsValid(existing_relid)) { - Oid existing_relid; - - existing_relid = get_relname_relid(stmt->relation->relname, - namespaceid); - if (existing_relid != InvalidOid) - { - ereport(NOTICE, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists, skipping", - stmt->relation->relname))); - return NIL; - } + ereport(NOTICE, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s\" already exists, skipping", + stmt->relation->relname))); + return NIL; } /* diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 37b259dc9f..fa3ba5bd10 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -58,7 +58,9 @@ extern Oid RangeVarGetRelidExtended(const RangeVar *relation, RangeVarGetRelidCallback callback, void *callback_arg); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); -extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); +extern Oid RangeVarGetAndCheckCreationNamespace(RangeVar *newRelation, + LOCKMODE lockmode, + Oid *existing_relation_id); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); extern Oid RelnameGetRelid(const char *relname); extern bool RelationIsVisible(Oid relid);