Improve behavior of concurrent rename statements.

Previously, renaming a table, sequence, view, index, foreign table,
column, or trigger checked permissions before locking the object, which
meant that if permissions were revoked during the lock wait, we would
still allow the operation.  Similarly, if the original object is dropped
and a new one with the same name is created, the operation will be allowed
if we had permissions on the old object; the permissions on the new
object don't matter.  All this is now fixed.

Along the way, attempting to rename a trigger on a foreign table now gives
the same error message as trying to create one there in the first place
(i.e. that it's not a table or view) rather than simply stating that no
trigger by that name exists.

Patch by me; review by Noah Misch.
This commit is contained in:
Robert Haas 2011-12-15 18:51:46 -05:00
parent d039fd51f7
commit 74a1d4fe7c
7 changed files with 201 additions and 150 deletions

View File

@ -105,62 +105,18 @@ ExecRenameStmt(RenameStmt *stmt)
case OBJECT_SEQUENCE:
case OBJECT_VIEW:
case OBJECT_INDEX:
case OBJECT_FOREIGN_TABLE:
RenameRelation(stmt);
break;
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
renameatt(stmt);
break;
case OBJECT_TRIGGER:
case OBJECT_FOREIGN_TABLE:
{
Oid relid;
CheckRelationOwnership(stmt->relation, true);
/*
* Lock level used here should match what will be taken later,
* in RenameRelation, renameatt, or renametrig.
*/
relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock,
false);
switch (stmt->renameType)
{
case OBJECT_TABLE:
case OBJECT_SEQUENCE:
case OBJECT_VIEW:
case OBJECT_INDEX:
case OBJECT_FOREIGN_TABLE:
{
/*
* RENAME TABLE requires that we (still) hold
* CREATE rights on the containing namespace, as
* well as ownership of the table.
*/
Oid namespaceId = get_rel_namespace(relid);
AclResult aclresult;
aclresult = pg_namespace_aclcheck(namespaceId,
GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(namespaceId));
RenameRelation(relid, stmt->newname, stmt->renameType);
break;
}
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
renameatt(relid, stmt);
break;
case OBJECT_TRIGGER:
renametrig(relid,
stmt->subname, /* old att name */
stmt->newname); /* new att name */
break;
default:
/* can't happen */ ;
}
break;
}
renametrig(stmt);
break;
case OBJECT_TSPARSER:
RenameTSParser(stmt->object, stmt->newname);

View File

@ -1474,28 +1474,24 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
{
Relation toastrel;
Oid toastidx;
Oid toastnamespace;
char NewToastName[NAMEDATALEN];
toastrel = relation_open(newrel->rd_rel->reltoastrelid,
AccessShareLock);
toastidx = toastrel->rd_rel->reltoastidxid;
toastnamespace = toastrel->rd_rel->relnamespace;
relation_close(toastrel, AccessShareLock);
/* rename the toast table ... */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
OIDOldHeap);
RenameRelationInternal(newrel->rd_rel->reltoastrelid,
NewToastName,
toastnamespace);
NewToastName);
/* ... and its index too */
snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
OIDOldHeap);
RenameRelationInternal(toastidx,
NewToastName,
toastnamespace);
NewToastName);
}
relation_close(newrel, NoLock);
}

View File

@ -2064,6 +2064,48 @@ SetRelationHasSubclass(Oid relationId, bool relhassubclass)
heap_close(relationRelation, RowExclusiveLock);
}
/*
* renameatt_check - basic sanity checks before attribute rename
*/
static void
renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing)
{
char relkind = classform->relkind;
if (classform->reloftype && !recursing)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot rename column of typed table")));
/*
* Renaming the columns of sequences or toast tables doesn't actually
* break anything from the system's point of view, since internal
* references are by attnum. But it doesn't seem right to allow users to
* change names that are hardcoded into the system, hence the following
* restriction.
*/
if (relkind != RELKIND_RELATION &&
relkind != RELKIND_VIEW &&
relkind != RELKIND_COMPOSITE_TYPE &&
relkind != RELKIND_INDEX &&
relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, composite type, index or foreign table",
NameStr(classform->relname))));
/*
* permissions checking. only the owner of a class can change its schema.
*/
if (!pg_class_ownercheck(myrelid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
NameStr(classform->relname));
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
NameStr(classform->relname))));
}
/*
* renameatt_internal - workhorse for renameatt
@ -2082,48 +2124,13 @@ renameatt_internal(Oid myrelid,
HeapTuple atttup;
Form_pg_attribute attform;
int attnum;
char relkind;
/*
* Grab an exclusive lock on the target table, which we will NOT release
* until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
if (targetrelation->rd_rel->reloftype && !recursing)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot rename column of typed table")));
/*
* Renaming the columns of sequences or toast tables doesn't actually
* break anything from the system's point of view, since internal
* references are by attnum. But it doesn't seem right to allow users to
* change names that are hardcoded into the system, hence the following
* restriction.
*/
relkind = RelationGetForm(targetrelation)->relkind;
if (relkind != RELKIND_RELATION &&
relkind != RELKIND_VIEW &&
relkind != RELKIND_COMPOSITE_TYPE &&
relkind != RELKIND_INDEX &&
relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, composite type, index or foreign table",
RelationGetRelationName(targetrelation))));
/*
* permissions checking. only the owner of a class can change its schema.
*/
if (!pg_class_ownercheck(myrelid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(targetrelation));
if (!allowSystemTableMods && IsSystemRelation(targetrelation))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
RelationGetRelationName(targetrelation))));
renameatt_check(myrelid, RelationGetForm(targetrelation), recursing);
/*
* if the 'recurse' flag is set then we are supposed to rename this
@ -2252,14 +2259,38 @@ renameatt_internal(Oid myrelid,
relation_close(targetrelation, NoLock); /* close rel but keep lock */
}
/*
* Perform permissions and integrity checks before acquiring a relation lock.
*/
static void
RangeVarCallbackForRenameAttribute(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
HeapTuple tuple;
Form_pg_class form;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
form = (Form_pg_class) GETSTRUCT(tuple);
renameatt_check(relid, form, false);
ReleaseSysCache(tuple);
}
/*
* renameatt - changes the name of a attribute in a relation
*/
void
renameatt(Oid myrelid, RenameStmt *stmt)
renameatt(RenameStmt *stmt)
{
renameatt_internal(myrelid,
Oid relid;
/* lock level taken here should match renameatt_internal */
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
RangeVarCallbackForRenameAttribute,
NULL);
renameatt_internal(relid,
stmt->subname, /* old att name */
stmt->newname, /* new att name */
interpretInhOption(stmt->relation->inhOpt), /* recursive? */
@ -2268,29 +2299,44 @@ renameatt(Oid myrelid, RenameStmt *stmt)
stmt->behavior);
}
/*
* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
*
* Caller has already done permissions checks.
* Perform permissions and integrity checks before acquiring a relation lock.
*/
void
RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
static void
RangeVarCallbackForRenameRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
Relation targetrelation;
Oid namespaceId;
char relkind;
RenameStmt *stmt = (RenameStmt *) arg;
ObjectType reltype;
HeapTuple tuple;
Form_pg_class classform;
AclResult aclresult;
char relkind;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
* Lock level used here should match ExecRenameStmt
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
classform = (Form_pg_class) GETSTRUCT(tuple);
relkind = classform->relkind;
namespaceId = RelationGetNamespace(targetrelation);
relkind = targetrelation->rd_rel->relkind;
/* Must own table. */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
NameStr(classform->relname));
/* No system table modifications unless explicitly allowed. */
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
NameStr(classform->relname))));
/* Must (still) have CREATE rights on containing namespace. */
aclresult = pg_namespace_aclcheck(classform->relnamespace, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(classform->relnamespace));
/*
* For compatibility with prior releases, we don't complain if ALTER TABLE
@ -2298,23 +2344,21 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
* ALTER SEQUENCE/VIEW/FOREIGN TABLE are only to be used with relations of
* that type.
*/
reltype = stmt->renameType;
if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence",
RelationGetRelationName(targetrelation))));
errmsg("\"%s\" is not a sequence", rv->relname)));
if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(targetrelation))));
errmsg("\"%s\" is not a view", rv->relname)));
if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table",
RelationGetRelationName(targetrelation))));
errmsg("\"%s\" is not a foreign table", rv->relname)));
/*
* Don't allow ALTER TABLE on composite types. We want people to use ALTER
@ -2323,17 +2367,35 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
if (relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a composite type",
RelationGetRelationName(targetrelation)),
errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
/* Do the work */
RenameRelationInternal(myrelid, newrelname, namespaceId);
ReleaseSysCache(tuple);
}
/*
* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
*/
void
RenameRelation(RenameStmt *stmt)
{
Oid relid;
/*
* Close rel, but keep exclusive lock!
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
* Lock level used here should match RenameRelationInternal, to avoid
* lock escalation.
*/
relation_close(targetrelation, NoLock);
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
RangeVarCallbackForRenameRelation,
(void *) stmt);
/* Do the work */
RenameRelationInternal(relid, stmt->newname);
}
/*
@ -2346,18 +2408,20 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
* sequence, AFAIK there's no need for it to be there.
*/
void
RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
RenameRelationInternal(Oid myrelid, const char *newrelname)
{
Relation targetrelation;
Relation relrelation; /* for RELATION relation */
HeapTuple reltup;
Form_pg_class relform;
Oid namespaceId;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*/
targetrelation = relation_open(myrelid, AccessExclusiveLock);
namespaceId = RelationGetNamespace(targetrelation);
/*
* Find relation's pg_class tuple, and make sure newrelname isn't in use.
@ -5376,7 +5440,7 @@ ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
ereport(NOTICE,
(errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"",
indexName, constraintName)));
RenameRelation(index_oid, constraintName, OBJECT_INDEX);
RenameRelationInternal(index_oid, constraintName);
}
/* Extra checks needed if making primary key */

View File

@ -1151,6 +1151,39 @@ get_trigger_oid(Oid relid, const char *trigname, bool missing_ok)
return oid;
}
/*
* Perform permissions and integrity checks before acquiring a relation lock.
*/
static void
RangeVarCallbackForRenameTrigger(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
HeapTuple tuple;
Form_pg_class form;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
form = (Form_pg_class) GETSTRUCT(tuple);
/* only tables and views can have triggers */
if (form->relkind != RELKIND_RELATION && form->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table or view", rv->relname)));
/* you must own the table to rename one of its triggers */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
if (!allowSystemTableMods && IsSystemClass(form))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rv->relname)));
ReleaseSysCache(tuple);
}
/*
* renametrig - changes the name of a trigger on a relation
*
@ -1165,21 +1198,26 @@ get_trigger_oid(Oid relid, const char *trigname, bool missing_ok)
* update row in catalog
*/
void
renametrig(Oid relid,
const char *oldname,
const char *newname)
renametrig(RenameStmt *stmt)
{
Relation targetrel;
Relation tgrel;
HeapTuple tuple;
SysScanDesc tgscan;
ScanKeyData key[2];
Oid relid;
/*
* Grab an exclusive lock on the target table, which we will NOT release
* until end of transaction.
* Look up name, check permissions, and acquire lock (which we will NOT
* release until end of transaction).
*/
targetrel = heap_open(relid, AccessExclusiveLock);
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
RangeVarCallbackForRenameTrigger,
NULL);
/* Have lock already, so just need to build relcache entry. */
targetrel = relation_open(relid, NoLock);
/*
* Scan pg_trigger twice for existing triggers on relation. We do this in
@ -1202,14 +1240,14 @@ renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
PointerGetDatum(newname));
PointerGetDatum(stmt->newname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("trigger \"%s\" for relation \"%s\" already exists",
newname, RelationGetRelationName(targetrel))));
stmt->newname, RelationGetRelationName(targetrel))));
systable_endscan(tgscan);
/*
@ -1222,7 +1260,7 @@ renametrig(Oid relid,
ScanKeyInit(&key[1],
Anum_pg_trigger_tgname,
BTEqualStrategyNumber, F_NAMEEQ,
PointerGetDatum(oldname));
PointerGetDatum(stmt->subname));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, 2, key);
if (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
@ -1232,7 +1270,8 @@ renametrig(Oid relid,
*/
tuple = heap_copytuple(tuple); /* need a modifiable copy */
namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname, newname);
namestrcpy(&((Form_pg_trigger) GETSTRUCT(tuple))->tgname,
stmt->newname);
simple_heap_update(tgrel, &tuple->t_self, tuple);
@ -1251,7 +1290,7 @@ renametrig(Oid relid,
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("trigger \"%s\" for table \"%s\" does not exist",
oldname, RelationGetRelationName(targetrel))));
stmt->subname, RelationGetRelationName(targetrel))));
}
systable_endscan(tgscan);
@ -1261,7 +1300,7 @@ renametrig(Oid relid,
/*
* Close rel, but keep exclusive lock!
*/
heap_close(targetrel, NoLock);
relation_close(targetrel, NoLock);
}

View File

@ -3121,8 +3121,7 @@ RenameType(List *names, const char *newTypeName)
* RenameRelationInternal will call RenameTypeInternal automatically.
*/
if (typTup->typtype == TYPTYPE_COMPOSITE)
RenameRelationInternal(typTup->typrelid, newTypeName,
typTup->typnamespace);
RenameRelationInternal(typTup->typrelid, newTypeName);
else
RenameTypeInternal(typeOid, newTypeName,
typTup->typnamespace);

View File

@ -45,15 +45,12 @@ extern void ExecuteTruncate(TruncateStmt *stmt);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
extern void renameatt(Oid myrelid, RenameStmt *stmt);
extern void renameatt(RenameStmt *stmt);
extern void RenameRelation(Oid myrelid,
const char *newrelname,
ObjectType reltype);
extern void RenameRelation(RenameStmt *stmt);
extern void RenameRelationInternal(Oid myrelid,
const char *newrelname,
Oid namespaceId);
const char *newrelname);
extern void find_composite_type_dependencies(Oid typeOid,
Relation origRelation,

View File

@ -115,7 +115,7 @@ extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
extern void RemoveTriggerById(Oid trigOid);
extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
extern void renametrig(Oid relid, const char *oldname, const char *newname);
extern void renametrig(RenameStmt *stmt);
extern void EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system);