Fix ALTER EXTENSION / SET SCHEMA

In its original conception, it was leaving some objects into the old
schema, but without their proper pg_depend entries; this meant that the
old schema could be dropped, causing future pg_dump calls to fail on the
affected database.  This was originally reported by Jeff Frost as #6704;
there have been other complaints elsewhere that can probably be traced
to this bug.

To fix, be more consistent about altering a table's subsidiary objects
along the table itself; this requires some restructuring in how tables
are relocated when altering an extension -- hence the new
AlterTableNamespaceInternal routine which encapsulates it for both the
ALTER TABLE and the ALTER EXTENSION cases.

There was another bug lurking here, which was unmasked after fixing the
previous one: certain objects would be reached twice via the dependency
graph, and the second attempt to move them would cause the entire
operation to fail.  Per discussion, it seems the best fix for this is to
do more careful tracking of objects already moved: we now maintain a
list of moved objects, to avoid attempting to do it twice for the same
object.

Authors: Alvaro Herrera, Dimitri Fontaine
Reviewed by Tom Lane
This commit is contained in:
Alvaro Herrera 2012-10-31 10:52:55 -03:00
parent 4af3dda136
commit 04f28bdb84
9 changed files with 148 additions and 67 deletions

View File

@ -679,7 +679,7 @@ RenameConstraintById(Oid conId, const char *newname)
*/
void
AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
Oid newNspId, bool isType)
Oid newNspId, bool isType, ObjectAddresses *objsMoved)
{
Relation conRel;
ScanKeyData key[1];
@ -712,6 +712,14 @@ AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
while (HeapTupleIsValid((tup = systable_getnext(scan))))
{
Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
ObjectAddress thisobj;
thisobj.classId = ConstraintRelationId;
thisobj.objectId = HeapTupleGetOid(tup);
thisobj.objectSubId = 0;
if (object_address_present(&thisobj, objsMoved))
continue;
if (conform->connamespace == oldNspId)
{
@ -729,6 +737,8 @@ AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
* changeDependencyFor().
*/
}
add_exact_object_address(&thisobj, objsMoved);
}
systable_endscan(scan);

View File

@ -252,7 +252,8 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
* object doesn't have a schema.
*/
Oid
AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid)
AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
ObjectAddresses *objsMoved)
{
Oid oldNspOid = InvalidOid;
ObjectAddress dep;
@ -266,20 +267,11 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid)
case OCLASS_CLASS:
{
Relation rel;
Relation classRel;
rel = relation_open(objid, AccessExclusiveLock);
oldNspOid = RelationGetNamespace(rel);
classRel = heap_open(RelationRelationId, RowExclusiveLock);
AlterRelationNamespaceInternal(classRel,
objid,
oldNspOid,
nspOid,
true);
heap_close(classRel, RowExclusiveLock);
AlterTableNamespaceInternal(rel, oldNspOid, nspOid, objsMoved);
relation_close(rel, NoLock);
break;
@ -290,7 +282,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid)
break;
case OCLASS_TYPE:
oldNspOid = AlterTypeNamespace_oid(objid, nspOid);
oldNspOid = AlterTypeNamespace_oid(objid, nspOid, objsMoved);
break;
case OCLASS_COLLATION:

View File

@ -2204,6 +2204,7 @@ AlterExtensionNamespace(List *names, const char *newschema)
Relation depRel;
SysScanDesc depScan;
HeapTuple depTup;
ObjectAddresses *objsMoved;
if (list_length(names) != 1)
ereport(ERROR,
@ -2278,6 +2279,8 @@ AlterExtensionNamespace(List *names, const char *newschema)
errmsg("extension \"%s\" does not support SET SCHEMA",
NameStr(extForm->extname))));
objsMoved = new_object_addresses();
/*
* Scan pg_depend to find objects that depend directly on the extension,
* and alter each one's schema.
@ -2317,9 +2320,11 @@ AlterExtensionNamespace(List *names, const char *newschema)
if (dep.objectSubId != 0) /* should not happen */
elog(ERROR, "extension should not have a sub-object dependency");
/* Relocate the object */
dep_oldNspOid = AlterObjectNamespace_oid(dep.classId,
dep.objectId,
nspOid);
nspOid,
objsMoved);
/*
* Remember previous namespace of first object that has one

View File

@ -261,10 +261,10 @@ static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int16 seqNumber, Relation inhRelation);
static int findAttrByName(const char *attributeName, List *schema);
static void AlterIndexNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid);
Oid oldNspOid, Oid newNspOid, ObjectAddresses *objsMoved);
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode);
Oid oldNspOid, Oid newNspOid, ObjectAddresses *objsMoved,
LOCKMODE lockmode);
static void ATExecValidateConstraint(Relation rel, char *constrName,
bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
@ -9711,8 +9711,8 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
Oid relid;
Oid oldNspOid;
Oid nspOid;
Relation classRel;
RangeVar *newrv;
ObjectAddresses *objsMoved;
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
stmt->missing_ok, false,
@ -9753,27 +9753,47 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
objsMoved = new_object_addresses();
AlterTableNamespaceInternal(rel, oldNspOid, nspOid, objsMoved);
free_object_addresses(objsMoved);
/* close rel, but keep lock until commit */
relation_close(rel, NoLock);
}
/*
* The guts of relocating a table to another namespace: besides moving
* the table itself, its dependent objects are relocated to the new schema.
*/
void
AlterTableNamespaceInternal(Relation rel, Oid oldNspOid, Oid nspOid,
ObjectAddresses *objsMoved)
{
Relation classRel;
Assert(objsMoved != NULL);
/* OK, modify the pg_class row and pg_depend entry */
classRel = heap_open(RelationRelationId, RowExclusiveLock);
AlterRelationNamespaceInternal(classRel, relid, oldNspOid, nspOid, true);
AlterRelationNamespaceInternal(classRel, RelationGetRelid(rel), oldNspOid,
nspOid, true, objsMoved);
/* Fix the table's row type too */
AlterTypeNamespaceInternal(rel->rd_rel->reltype, nspOid, false, false);
AlterTypeNamespaceInternal(rel->rd_rel->reltype,
nspOid, false, false, objsMoved);
/* Fix other dependent stuff */
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, stmt->newschema,
AccessExclusiveLock);
AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid, objsMoved);
AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid,
objsMoved, AccessExclusiveLock);
AlterConstraintNamespaces(RelationGetRelid(rel), oldNspOid, nspOid,
false, objsMoved);
}
heap_close(classRel, RowExclusiveLock);
/* close rel, but keep lock until commit */
relation_close(rel, NoLock);
}
/*
@ -9784,10 +9804,11 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt)
void
AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Oid oldNspOid, Oid newNspOid,
bool hasDependEntry)
bool hasDependEntry, ObjectAddresses *objsMoved)
{
HeapTuple classTup;
Form_pg_class classForm;
ObjectAddress thisobj;
classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(classTup))
@ -9796,27 +9817,39 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Assert(classForm->relnamespace == oldNspOid);
/* check for duplicate name (more friendly than unique-index failure) */
if (get_relname_relid(NameStr(classForm->relname),
newNspOid) != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists in schema \"%s\"",
NameStr(classForm->relname),
get_namespace_name(newNspOid))));
thisobj.classId = RelationRelationId;
thisobj.objectId = relOid;
thisobj.objectSubId = 0;
/* classTup is a copy, so OK to scribble on */
classForm->relnamespace = newNspOid;
/*
* Do nothing when there's nothing to do.
*/
if (!object_address_present(&thisobj, objsMoved))
{
/* check for duplicate name (more friendly than unique-index failure) */
if (get_relname_relid(NameStr(classForm->relname),
newNspOid) != InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists in schema \"%s\"",
NameStr(classForm->relname),
get_namespace_name(newNspOid))));
simple_heap_update(classRel, &classTup->t_self, classTup);
CatalogUpdateIndexes(classRel, classTup);
/* classTup is a copy, so OK to scribble on */
classForm->relnamespace = newNspOid;
/* Update dependency on schema if caller said so */
if (hasDependEntry &&
changeDependencyFor(RelationRelationId, relOid,
NamespaceRelationId, oldNspOid, newNspOid) != 1)
elog(ERROR, "failed to change schema dependency for relation \"%s\"",
NameStr(classForm->relname));
simple_heap_update(classRel, &classTup->t_self, classTup);
CatalogUpdateIndexes(classRel, classTup);
/* Update dependency on schema if caller said so */
if (hasDependEntry &&
changeDependencyFor(RelationRelationId, relOid,
NamespaceRelationId, oldNspOid, newNspOid) != 1)
elog(ERROR, "failed to change schema dependency for relation \"%s\"",
NameStr(classForm->relname));
add_exact_object_address(&thisobj, objsMoved);
}
heap_freetuple(classTup);
}
@ -9829,7 +9862,7 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
*/
static void
AlterIndexNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid)
Oid oldNspOid, Oid newNspOid, ObjectAddresses *objsMoved)
{
List *indexList;
ListCell *l;
@ -9839,15 +9872,27 @@ AlterIndexNamespaces(Relation classRel, Relation rel,
foreach(l, indexList)
{
Oid indexOid = lfirst_oid(l);
ObjectAddress thisobj;
thisobj.classId = RelationRelationId;
thisobj.objectId = indexOid;
thisobj.objectSubId = 0;
/*
* Note: currently, the index will not have its own dependency on the
* namespace, so we don't need to do changeDependencyFor(). There's no
* row type in pg_type, either.
*
* XXX this objsMoved test may be pointless -- surely we have a single
* dependency link from a relation to each index?
*/
AlterRelationNamespaceInternal(classRel, indexOid,
oldNspOid, newNspOid,
false);
if (!object_address_present(&thisobj, objsMoved))
{
AlterRelationNamespaceInternal(classRel, indexOid,
oldNspOid, newNspOid,
false, objsMoved);
add_exact_object_address(&thisobj, objsMoved);
}
}
list_free(indexList);
@ -9862,7 +9907,8 @@ AlterIndexNamespaces(Relation classRel, Relation rel,
*/
static void
AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode)
Oid oldNspOid, Oid newNspOid, ObjectAddresses *objsMoved,
LOCKMODE lockmode)
{
Relation depRel;
SysScanDesc scan;
@ -9914,14 +9960,14 @@ AlterSeqNamespaces(Relation classRel, Relation rel,
/* Fix the pg_class and pg_depend entries */
AlterRelationNamespaceInternal(classRel, depForm->objid,
oldNspOid, newNspOid,
true);
true, objsMoved);
/*
* Sequences have entries in pg_type. We need to be careful to move
* them to the new namespace, too.
*/
AlterTypeNamespaceInternal(RelationGetForm(seqRel)->reltype,
newNspOid, false, false);
newNspOid, false, false, objsMoved);
/* Now we can close it. Keep the lock till end of transaction. */
relation_close(seqRel, NoLock);

View File

@ -3322,6 +3322,7 @@ AlterTypeNamespace(List *names, const char *newschema, ObjectType objecttype)
TypeName *typename;
Oid typeOid;
Oid nspOid;
ObjectAddresses *objsMoved;
/* Make a TypeName so we can use standard type lookup machinery */
typename = makeTypeNameFromNameList(names);
@ -3337,11 +3338,13 @@ AlterTypeNamespace(List *names, const char *newschema, ObjectType objecttype)
/* get schema OID and check its permissions */
nspOid = LookupCreationNamespace(newschema);
AlterTypeNamespace_oid(typeOid, nspOid);
objsMoved = new_object_addresses();
AlterTypeNamespace_oid(typeOid, nspOid, objsMoved);
free_object_addresses(objsMoved);
}
Oid
AlterTypeNamespace_oid(Oid typeOid, Oid nspOid)
AlterTypeNamespace_oid(Oid typeOid, Oid nspOid, ObjectAddresses *objsMoved)
{
Oid elemOid;
@ -3360,7 +3363,7 @@ AlterTypeNamespace_oid(Oid typeOid, Oid nspOid)
format_type_be(elemOid))));
/* and do the work */
return AlterTypeNamespaceInternal(typeOid, nspOid, false, true);
return AlterTypeNamespaceInternal(typeOid, nspOid, false, true, objsMoved);
}
/*
@ -3381,7 +3384,8 @@ AlterTypeNamespace_oid(Oid typeOid, Oid nspOid)
Oid
AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
bool isImplicitArray,
bool errorOnTableType)
bool errorOnTableType,
ObjectAddresses *objsMoved)
{
Relation rel;
HeapTuple tup;
@ -3389,6 +3393,17 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
Oid oldNspOid;
Oid arrayOid;
bool isCompositeType;
ObjectAddress thisobj;
/*
* Make sure we haven't moved this object previously.
*/
thisobj.classId = TypeRelationId;
thisobj.objectId = typeOid;
thisobj.objectSubId = 0;
if (object_address_present(&thisobj, objsMoved))
return InvalidOid;
rel = heap_open(TypeRelationId, RowExclusiveLock);
@ -3449,7 +3464,7 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
AlterRelationNamespaceInternal(classRel, typform->typrelid,
oldNspOid, nspOid,
false);
false, objsMoved);
heap_close(classRel, RowExclusiveLock);
@ -3458,13 +3473,14 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
* currently support this, but probably will someday).
*/
AlterConstraintNamespaces(typform->typrelid, oldNspOid,
nspOid, false);
nspOid, false, objsMoved);
}
else
{
/* If it's a domain, it might have constraints */
if (typform->typtype == TYPTYPE_DOMAIN)
AlterConstraintNamespaces(typeOid, oldNspOid, nspOid, true);
AlterConstraintNamespaces(typeOid, oldNspOid, nspOid, true,
objsMoved);
}
/*
@ -3482,9 +3498,11 @@ AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
heap_close(rel, RowExclusiveLock);
add_exact_object_address(&thisobj, objsMoved);
/* Recursively alter the associated array type, if any */
if (OidIsValid(arrayOid))
AlterTypeNamespaceInternal(arrayOid, nspOid, true, true);
AlterTypeNamespaceInternal(arrayOid, nspOid, true, true, objsMoved);
return oldNspOid;
}

View File

@ -20,6 +20,7 @@
#define PG_CONSTRAINT_H
#include "catalog/genbki.h"
#include "catalog/dependency.h"
#include "nodes/pg_list.h"
/* ----------------
@ -244,7 +245,7 @@ extern char *ChooseConstraintName(const char *name1, const char *name2,
List *others);
extern void AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
Oid newNspId, bool isType);
Oid newNspId, bool isType, ObjectAddresses *objsMoved);
extern Oid get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok);
extern Oid get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok);

View File

@ -14,13 +14,15 @@
#ifndef ALTER_H
#define ALTER_H
#include "catalog/dependency.h"
#include "nodes/parsenodes.h"
#include "utils/relcache.h"
extern void ExecRenameStmt(RenameStmt *stmt);
extern void ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt);
extern Oid AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid);
extern Oid AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
ObjectAddresses *objsMoved);
extern Oid AlterObjectNamespace_internal(Relation rel, Oid objid, Oid nspOid);
extern void ExecAlterOwnerStmt(AlterOwnerStmt *stmt);

View File

@ -15,6 +15,7 @@
#define TABLECMDS_H
#include "access/htup.h"
#include "catalog/dependency.h"
#include "nodes/parsenodes.h"
#include "storage/lock.h"
#include "utils/relcache.h"
@ -36,9 +37,13 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
extern void AlterTableNamespace(AlterObjectSchemaStmt *stmt);
extern void AlterTableNamespaceInternal(Relation rel, Oid oldNspOid,
Oid nspOid, ObjectAddresses *objsMoved);
extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Oid oldNspOid, Oid newNspOid,
bool hasDependEntry);
bool hasDependEntry,
ObjectAddresses *objsMoved);
extern void CheckTableNotInUse(Relation rel, const char *stmt);

View File

@ -15,6 +15,7 @@
#define TYPECMDS_H
#include "access/htup.h"
#include "catalog/dependency.h"
#include "nodes/parsenodes.h"
@ -45,9 +46,10 @@ extern void AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype);
extern void AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId,
bool hasDependEntry);
extern void AlterTypeNamespace(List *names, const char *newschema, ObjectType objecttype);
extern Oid AlterTypeNamespace_oid(Oid typeOid, Oid nspOid);
extern Oid AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
extern Oid AlterTypeNamespace_oid(Oid typeOid, Oid nspOid, ObjectAddresses *objsMoved);
extern Oid AlterTypeNamespaceInternal(Oid typeOid, Oid nspOid,
bool isImplicitArray,
bool errorOnTableType);
bool errorOnTableType,
ObjectAddresses *objsMoved);
#endif /* TYPECMDS_H */