Do a pass of code review for the ALTER TABLE ADD INHERITS patch. Keep

the read lock we hold on the table's parent relation until commit.
Update equalfuncs.c for the new field in AlterTableCmd. Various
improvements to comments, variable names, and error reporting.

There is room for further improvement here, but this is at least
a step in the right direction.
This commit is contained in:
Neil Conway 2006-07-02 05:17:26 +00:00
parent 277807bd9e
commit 7fb9090ebf
4 changed files with 135 additions and 138 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.190 2006/07/02 02:23:19 momjian Exp $
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.191 2006/07/02 05:17:26 neilc Exp $
*
*-------------------------------------------------------------------------
*/
@ -158,8 +158,8 @@ typedef struct NewColumnValue
static void truncate_check_rel(Relation rel);
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr, int *supOidCount);
static void MergeConstraintsIntoExisting(Relation rel, Relation relation);
static void MergeAttributesIntoExisting(Relation rel, Relation relation);
static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
static void StoreCatalogInheritance(Oid relationId, List *supers);
static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
@ -1233,55 +1233,55 @@ StoreCatalogInheritance(Oid relationId, List *supers)
foreach(entry, supers)
{
StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation);
seqNumber += 1;
seqNumber++;
}
heap_close(relation, RowExclusiveLock);
}
static void
StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation relation)
StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int16 seqNumber, Relation relation)
{
Datum datum[Natts_pg_inherits];
char nullarr[Natts_pg_inherits];
ObjectAddress childobject,
Datum datum[Natts_pg_inherits];
char nullarr[Natts_pg_inherits];
ObjectAddress childobject,
parentobject;
HeapTuple tuple;
TupleDesc desc = RelationGetDescr(relation);
HeapTuple tuple;
TupleDesc desc = RelationGetDescr(relation);
datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
nullarr[0] = ' ';
nullarr[1] = ' ';
nullarr[2] = ' ';
nullarr[0] = ' ';
nullarr[1] = ' ';
nullarr[2] = ' ';
tuple = heap_formtuple(desc, datum, nullarr);
tuple = heap_formtuple(desc, datum, nullarr);
simple_heap_insert(relation, tuple);
simple_heap_insert(relation, tuple);
CatalogUpdateIndexes(relation, tuple);
CatalogUpdateIndexes(relation, tuple);
heap_freetuple(tuple);
heap_freetuple(tuple);
/*
* Store a dependency too
*/
parentobject.classId = RelationRelationId;
parentobject.objectId = parentOid;
parentobject.objectSubId = 0;
childobject.classId = RelationRelationId;
childobject.objectId = relationId;
childobject.objectSubId = 0;
/*
* Store a dependency too
*/
parentobject.classId = RelationRelationId;
parentobject.objectId = parentOid;
parentobject.objectSubId = 0;
childobject.classId = RelationRelationId;
childobject.objectId = relationId;
childobject.objectSubId = 0;
recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
/*
* Mark the parent as having subclasses.
*/
setRelhassubclassInRelation(parentOid, true);
recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL);
/*
* Mark the parent as having subclasses.
*/
setRelhassubclassInRelation(parentOid, true);
}
/*
@ -6048,31 +6048,34 @@ ATExecEnableDisableTrigger(Relation rel, char *trigname,
}
static char *
decompile_conbin(HeapTuple contuple, TupleDesc tupledesc)
decompile_conbin(HeapTuple contup, TupleDesc tupdesc)
{
Form_pg_constraint con = (Form_pg_constraint)(GETSTRUCT(contuple));
bool isnull;
Datum d;
Form_pg_constraint con;
bool isnull;
Datum attr;
Datum expr;
d = fastgetattr(contuple, Anum_pg_constraint_conbin, tupledesc, &isnull);
con = (Form_pg_constraint) GETSTRUCT(contup);
attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull);
if (isnull)
elog(ERROR, "conbin is null for constraint \"%s\"", NameStr(con->conname));
d = DirectFunctionCall2(pg_get_expr, d, ObjectIdGetDatum(con->conrelid));
return DatumGetCString(DirectFunctionCall1(textout,d));
elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup));
expr = DirectFunctionCall2(pg_get_expr, attr,
ObjectIdGetDatum(con->conrelid));
return DatumGetCString(DirectFunctionCall1(textout, expr));
}
/* ALTER TABLE INHERIT */
/* Add a parent to the child's parents. This verifies that all the columns and
/*
* ALTER TABLE INHERIT
*
* Add a parent to the child's parents. This verifies that all the columns and
* check constraints of the parent appear in the child and that they have the
* same data type and expressions.
*/
static void
ATExecAddInherits(Relation rel, RangeVar *parent)
ATExecAddInherits(Relation child_rel, RangeVar *parent)
{
Relation relation,
Relation parent_rel,
catalogRelation;
SysScanDesc scan;
ScanKeyData key;
@ -6080,50 +6083,49 @@ ATExecAddInherits(Relation rel, RangeVar *parent)
int4 inhseqno;
List *children;
/* XXX is this enough locking? */
relation = heap_openrv(parent, AccessShareLock);
parent_rel = heap_openrv(parent, AccessShareLock);
/*
* Must be owner of both parent and child -- child is taken care of by
* ATSimplePermissions call in ATPrepCmd
*/
ATSimplePermissions(relation, false);
ATSimplePermissions(parent_rel, false);
/* Permanent rels cannot inherit from temporary ones */
if (!isTempNamespace(RelationGetNamespace(rel)) &&
isTempNamespace(RelationGetNamespace(relation)))
if (!isTempNamespace(RelationGetNamespace(child_rel)) &&
isTempNamespace(RelationGetNamespace(parent_rel)))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot inherit from temporary relation \"%s\"",
parent->relname)));
/* If parent has OIDs then all children must have OIDs */
if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
RelationGetRelationName(rel), parent->relname)));
RelationGetRelationName(child_rel), parent->relname)));
/*
* Reject duplications in the list of parents. We scan through the list of
* parents in pg_inherit and keep track of the first open inhseqno slot
* found to use for the new parent.
* Don't allow any duplicates in the list of parents. We scan through the
* list of parents in pg_inherit and keep track of the first open inhseqno
* slot found to use for the new parent.
*/
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
ObjectIdGetDatum(RelationGetRelid(child_rel)));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
true, SnapshotNow, 1, &key);
inhseqno = 0; /* inhseqno sequences are supposed to start at
* 1 */
true, SnapshotNow, 1, &key);
/* inhseqno sequences start at 1 */
inhseqno = 0;
while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
if (inh->inhparent == RelationGetRelid(relation))
if (inh->inhparent == RelationGetRelid(parent_rel))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("inherited relation \"%s\" duplicated",
@ -6135,36 +6137,33 @@ ATExecAddInherits(Relation rel, RangeVar *parent)
heap_close(catalogRelation, RowExclusiveLock);
/*
* If the new parent is found in our list of inheritors we have a circular
* If the new parent is found in our list of inheritors, we have a circular
* structure
*/
children = find_all_inheritors(RelationGetRelid(child_rel));
/* this routine is actually in the planner */
children = find_all_inheritors(RelationGetRelid(rel));
if (list_member_oid(children, RelationGetRelid(relation)))
if (list_member_oid(children, RelationGetRelid(parent_rel)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("circular inheritance structure found, \"%s\" is already a child of \"%s\"",
parent->relname, RelationGetRelationName(rel))));
errmsg("circular inheritance structure found"),
errdetail("\"%s\" is already a child of \"%s\".",
parent->relname,
RelationGetRelationName(child_rel))));
/* Match up the columns and bump attinhcount and attislocal */
MergeAttributesIntoExisting(rel, relation);
MergeAttributesIntoExisting(child_rel, parent_rel);
/* Match up the constraints and make sure they're present in child */
MergeConstraintsIntoExisting(rel, relation);
MergeConstraintsIntoExisting(child_rel, parent_rel);
/*
* Use this refactored part of StoreCatalogInheritance which CREATE TABLE
* uses to add the pg_inherit line
*/
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation),
inhseqno + 1, catalogRelation);
StoreCatalogInheritance1(RelationGetRelid(child_rel),
RelationGetRelid(parent_rel),
inhseqno + 1, catalogRelation);
heap_close(catalogRelation, RowExclusiveLock);
heap_close(relation, AccessShareLock);
/* keep our lock on the parent relation until commit */
heap_close(parent_rel, NoLock);
}
/*
@ -6175,28 +6174,24 @@ ATExecAddInherits(Relation rel, RangeVar *parent)
* Currently all columns must be found in child. Missing columns are an error.
* One day we might consider creating new columns like CREATE TABLE does.
*
* The data type must match perfectly, if the parent column is NOT NULL then
* The data type must match perfectly. If the parent column is NOT NULL then
* the child table must be as well. Defaults are ignored however.
*
*/
static void
MergeAttributesIntoExisting(Relation rel, Relation relation)
MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
{
Relation attrdesc;
AttrNumber parent_attno,
child_attno;
TupleDesc tupleDesc;
Relation attrdesc;
AttrNumber parent_attno;
int parent_natts;
TupleDesc tupleDesc;
TupleConstr *constr;
HeapTuple tuple;
HeapTuple tuple;
child_attno = RelationGetNumberOfAttributes(rel);
tupleDesc = RelationGetDescr(relation);
tupleDesc = RelationGetDescr(parent_rel);
parent_natts = tupleDesc->natts;
constr = tupleDesc->constr;
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
for (parent_attno = 1; parent_attno <= parent_natts; parent_attno++)
{
Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
char *attributeName = NameStr(attribute->attname);
@ -6208,7 +6203,8 @@ MergeAttributesIntoExisting(Relation rel, Relation relation)
/* Does it conflict with an existing column? */
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);
tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel),
attributeName);
if (HeapTupleIsValid(tuple))
{
/*
@ -6218,12 +6214,17 @@ MergeAttributesIntoExisting(Relation rel, Relation relation)
Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
if (attribute->atttypid != childatt->atttypid ||
attribute->atttypmod != childatt->atttypmod ||
(attribute->attnotnull && !childatt->attnotnull))
attribute->atttypmod != childatt->atttypmod)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different type for column \"%s\"",
RelationGetRelationName(rel), NameStr(attribute->attname))));
RelationGetRelationName(child_rel), NameStr(attribute->attname))));
if (attribute->attnotnull && !childatt->attnotnull)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" in child table must be NOT NULL",
NameStr(attribute->attname))));
childatt->attinhcount++;
simple_heap_update(attrdesc, &tuple->t_self, tuple);
@ -6239,8 +6240,6 @@ MergeAttributesIntoExisting(Relation rel, Relation relation)
else
{
/*
* No, create a new inherited column
*
* Creating inherited columns in this case seems to be unpopular.
* In the common use case of partitioned tables it's a foot-gun.
*/
@ -6249,9 +6248,9 @@ MergeAttributesIntoExisting(Relation rel, Relation relation)
errmsg("child table missing column \"%s\"",
NameStr(attribute->attname))));
}
heap_close(attrdesc, RowExclusiveLock);
}
}
/*
@ -6270,9 +6269,8 @@ MergeAttributesIntoExisting(Relation rel, Relation relation)
* constraints. As long as tables have more like 10 constraints it shouldn't be
* an issue though. Even 100 constraints ought not be the end of the world.
*/
static void
MergeConstraintsIntoExisting(Relation rel, Relation relation)
MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
{
Relation catalogRelation;
TupleDesc tupleDesc;
@ -6290,18 +6288,18 @@ MergeConstraintsIntoExisting(Relation rel, Relation relation)
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber,
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
ObjectIdGetDatum(RelationGetRelid(child_rel)));
scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
true, SnapshotNow, 1, &key);
true, SnapshotNow, 1, &key);
constraints = NIL;
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) (GETSTRUCT(constraintTuple));
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
if (con->contype != CONSTRAINT_CHECK)
continue;
/* XXX Do I need the copytuple here? */
constraints = lappend(constraints, heap_copytuple(constraintTuple));
}
systable_endscan(scan);
@ -6311,13 +6309,13 @@ MergeConstraintsIntoExisting(Relation rel, Relation relation)
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber,
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(relation)));
ObjectIdGetDatum(RelationGetRelid(parent_rel)));
scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true,
SnapshotNow, 1, &key);
SnapshotNow, 1, &key);
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{
bool found = false;
Form_pg_constraint parent_con = (Form_pg_constraint) (GETSTRUCT(constraintTuple));
Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
Form_pg_constraint child_con = NULL;
HeapTuple child_contuple = NULL;
@ -6327,7 +6325,7 @@ MergeConstraintsIntoExisting(Relation rel, Relation relation)
foreach(elem, constraints)
{
child_contuple = lfirst(elem);
child_con = (Form_pg_constraint) (GETSTRUCT(child_contuple));
child_con = (Form_pg_constraint) GETSTRUCT(child_contuple);
if (!strcmp(NameStr(parent_con->conname),
NameStr(child_con->conname)))
{
@ -6357,13 +6355,15 @@ MergeConstraintsIntoExisting(Relation rel, Relation relation)
* would have to bump them just like attributes
*/
}
systable_endscan(scan);
heap_close(catalogRelation, AccessShareLock);
}
/* ALTER TABLE NO INHERIT */
/* Drop a parent from the child's parents. This just adjusts the attinhcount
/*
* ALTER TABLE NO INHERIT
*
* Drop a parent from the child's parents. This just adjusts the attinhcount
* and attislocal of the columns and removes the pg_inherit and pg_depend
* entries.
*
@ -6373,12 +6373,9 @@ MergeConstraintsIntoExisting(Relation rel, Relation relation)
* least we'll never surprise by dropping columns someone isn't expecting to be
* dropped which would actually mean data loss.
*/
static void
ATExecDropInherits(Relation rel, RangeVar *parent)
{
Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key[2];
@ -6395,10 +6392,6 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
* be clever and look at each parent and see if it matches but that would
* be inconsistent with other operations I think.
*/
Assert(rel);
Assert(parent);
dropparent = RangeVarGetRelid(parent, false);
/* Search through the direct parents of rel looking for dropparent oid */
@ -6408,20 +6401,23 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, key);
while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
true, SnapshotNow, 1, key);
while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
if (inhparent == dropparent)
{
simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
found = true;
break;
}
}
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
if (!found)
{
if (parent->schemaname)
@ -6445,10 +6441,10 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId,
true, SnapshotNow, 1, key);
true, SnapshotNow, 1, key);
while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
{
Form_pg_attribute att = ((Form_pg_attribute) GETSTRUCT(attributeTuple));
Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
/*
* Not an inherited column at all (do NOT use islocal for this
@ -6463,7 +6459,7 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
{
/* Decrement inhcount and possibly set islocal to 1 */
HeapTuple copyTuple = heap_copytuple(attributeTuple);
Form_pg_attribute copy_att = ((Form_pg_attribute) GETSTRUCT(copyTuple));
Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple);
copy_att->attinhcount--;
if (copy_att->attinhcount == 0)
@ -6485,7 +6481,6 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
/*
* Drop the dependency
*
@ -6519,12 +6514,11 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
* just in case...
*/
simple_heap_delete(catalogRelation, &depTuple->t_self);
break;
}
}
systable_endscan(scan);
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
}

View File

@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.340 2006/07/02 02:23:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.341 2006/07/02 05:17:26 neilc Exp $
*
*-------------------------------------------------------------------------
*/
@ -1801,8 +1801,8 @@ _copyAlterTableCmd(AlterTableCmd *from)
COPY_SCALAR_FIELD(subtype);
COPY_STRING_FIELD(name);
COPY_NODE_FIELD(def);
COPY_NODE_FIELD(parent);
COPY_NODE_FIELD(def);
COPY_NODE_FIELD(transform);
COPY_SCALAR_FIELD(behavior);

View File

@ -18,7 +18,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.274 2006/07/02 02:23:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.275 2006/07/02 05:17:26 neilc Exp $
*
*-------------------------------------------------------------------------
*/
@ -765,6 +765,7 @@ _equalAlterTableCmd(AlterTableCmd *a, AlterTableCmd *b)
{
COMPARE_SCALAR_FIELD(subtype);
COMPARE_STRING_FIELD(name);
COMPARE_NODE_FIELD(parent);
COMPARE_NODE_FIELD(def);
COMPARE_NODE_FIELD(transform);
COMPARE_SCALAR_FIELD(behavior);

View File

@ -342,9 +342,11 @@ alter table atacc3 inherit atacc2;
alter table atacc3 inherit atacc2;
ERROR: inherited relation "atacc2" duplicated
alter table atacc2 inherit atacc3;
ERROR: circular inheritance structure found, "atacc3" is already a child of "atacc2"
ERROR: circular inheritance structure found
DETAIL: "atacc3" is already a child of "atacc2".
alter table atacc2 inherit atacc2;
ERROR: circular inheritance structure found, "atacc2" is already a child of "atacc2"
ERROR: circular inheritance structure found
DETAIL: "atacc2" is already a child of "atacc2".
-- test that we really are a child now (should see 4 not 3 and cascade should go through)
select test2 from atacc2;
test2