Re-allow specification of a new default value for an inherited column

in CREATE TABLE, but give a warning notice.  Clean up inconsistent
handling of defaults and NOT NULL flags from multiply-inherited columns.
Per pghackers discussion 28-Mar through 30-Mar.
This commit is contained in:
Tom Lane 2001-03-30 20:50:36 +00:00
parent 188e0fe457
commit 648c09181e
3 changed files with 355 additions and 294 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.74 2001/03/22 06:16:11 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.75 2001/03/30 20:50:36 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -34,11 +34,11 @@
* ----------------
*/
static int checkAttrExists(const char *attributeName,
const char *attributeType, List *schema);
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr);
static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
static void StoreCatalogInheritance(Oid relationId, List *supers);
static int findAttrByName(const char *attributeName, List *schema);
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
@ -63,9 +63,10 @@ DefineRelation(CreateStmt *stmt, char relkind)
int i;
AttrNumber attnum;
if (strlen(stmt->relname) >= NAMEDATALEN)
elog(ERROR, "the relation name %s is >= %d characters long",
stmt->relname, NAMEDATALEN);
/*
* Truncate relname to appropriate length (probably a waste of time,
* as parser should have done this already).
*/
StrNCpy(relname, stmt->relname, NAMEDATALEN);
/*
@ -80,7 +81,7 @@ DefineRelation(CreateStmt *stmt, char relkind)
elog(ERROR, "DefineRelation: please inherit from a relation or define an attribute");
/*
* create a relation descriptor from the relation schema and create
* Create a relation descriptor from the relation schema and create
* the relation. Note that in this stage only inherited (pre-cooked)
* defaults and constraints will be included into the new relation.
* (BuildDescForRelation takes care of the inherited defaults, but we
@ -234,12 +235,339 @@ TruncateRelation(char *name)
heap_truncate(name);
}
/*----------
* MergeAttributes
* Returns new schema given initial schema and superclasses.
*
* Input arguments:
* 'schema' is the column/attribute definition for the table. (It's a list
* of ColumnDef's.) It is destructively changed.
* 'supers' is a list of names (as Value objects) of parent relations.
* 'istemp' is TRUE if we are creating a temp relation.
*
* Output arguments:
* 'supOids' receives an integer list of the OIDs of the parent relations.
* 'supconstr' receives a list of constraints belonging to the parents,
* updated as necessary to be valid for the child.
*
* Return value:
* Completed schema list.
*
* Notes:
* The order in which the attributes are inherited is very important.
* Intuitively, the inherited attributes should come first. If a table
* inherits from multiple parents, the order of those attributes are
* according to the order of the parents specified in CREATE TABLE.
*
* Here's an example:
*
* create table person (name text, age int4, location point);
* create table emp (salary int4, manager text) inherits(person);
* create table student (gpa float8) inherits (person);
* create table stud_emp (percent int4) inherits (emp, student);
*
* The order of the attributes of stud_emp is:
*
* person {1:name, 2:age, 3:location}
* / \
* {6:gpa} student emp {4:salary, 5:manager}
* \ /
* stud_emp {7:percent}
*
* If the same attribute name appears multiple times, then it appears
* in the result table in the proper location for its first appearance.
*----------
*/
static List *
MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr)
{
List *entry;
List *inhSchema = NIL;
List *parentOids = NIL;
List *constraints = NIL;
int child_attno;
/*
* Check for duplicate names in the explicit list of attributes.
*
* Although we might consider merging such entries in the same way that
* we handle name conflicts for inherited attributes, it seems to make
* more sense to assume such conflicts are errors.
*/
foreach(entry, schema)
{
ColumnDef *coldef = lfirst(entry);
List *rest;
foreach(rest, lnext(entry))
{
ColumnDef *restdef = lfirst(rest);
if (strcmp(coldef->colname, restdef->colname) == 0)
{
elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
coldef->colname);
}
}
}
/*
* Reject duplicate names in the list of parents, too.
*/
foreach(entry, supers)
{
List *rest;
foreach(rest, lnext(entry))
{
if (strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))) == 0)
{
elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
strVal(lfirst(entry)));
}
}
}
/*
* Scan the parents left-to-right, and merge their attributes to form
* a list of inherited attributes (inhSchema).
*/
child_attno = 0;
foreach(entry, supers)
{
char *name = strVal(lfirst(entry));
Relation relation;
TupleDesc tupleDesc;
TupleConstr *constr;
AttrNumber *newattno;
AttrNumber parent_attno;
relation = heap_openr(name, AccessShareLock);
if (relation->rd_rel->relkind != RELKIND_RELATION)
elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
/* Permanent rels cannot inherit from temporary ones */
if (!istemp && is_temp_rel_name(name))
elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name);
/*
* We should have an UNDER permission flag for this, but for now,
* demand that creator of a child table own the parent.
*/
if (!pg_ownercheck(GetUserId(), name, RELNAME))
elog(ERROR, "you do not own table \"%s\"", name);
parentOids = lappendi(parentOids, relation->rd_id);
setRelhassubclassInRelation(relation->rd_id, true);
tupleDesc = RelationGetDescr(relation);
constr = tupleDesc->constr;
/*
* newattno[] will contain the child-table attribute numbers for
* the attributes of this parent table. (They are not the same
* for parents after the first one.)
*/
newattno = (AttrNumber *) palloc(tupleDesc->natts*sizeof(AttrNumber));
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
char *attributeName;
char *attributeType;
HeapTuple tuple;
int exist_attno;
ColumnDef *def;
TypeName *typename;
/*
* Get name and type name of attribute
*/
attributeName = NameStr(attribute->attname);
tuple = SearchSysCache(TYPEOID,
ObjectIdGetDatum(attribute->atttypid),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "CREATE TABLE: cache lookup failed for type %u",
attribute->atttypid);
attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname));
ReleaseSysCache(tuple);
/*
* Does it conflict with some previously inherited column?
*/
exist_attno = findAttrByName(attributeName, inhSchema);
if (exist_attno > 0)
{
/*
* Yes, try to merge the two column definitions.
* They must have the same type and typmod.
*/
elog(NOTICE, "CREATE TABLE: merging multiple inherited definitions of attribute \"%s\"",
attributeName);
def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
if (strcmp(def->typename->name, attributeType) != 0 ||
def->typename->typmod != attribute->atttypmod)
elog(ERROR, "CREATE TABLE: inherited attribute \"%s\" type conflict (%s and %s)",
attributeName, def->typename->name, attributeType);
/* Merge of NOT NULL constraints = OR 'em together */
def->is_not_null |= attribute->attnotnull;
/* Default and other constraints are handled below */
newattno[parent_attno - 1] = exist_attno;
}
else
{
/*
* No, create a new inherited column
*/
def = makeNode(ColumnDef);
def->colname = pstrdup(attributeName);
typename = makeNode(TypeName);
typename->name = attributeType;
typename->typmod = attribute->atttypmod;
def->typename = typename;
def->is_not_null = attribute->attnotnull;
def->is_sequence = false;
def->raw_default = NULL;
def->cooked_default = NULL;
def->constraints = NIL;
inhSchema = lappend(inhSchema, def);
newattno[parent_attno - 1] = ++child_attno;
}
/*
* Copy default if any, overriding any default from earlier parent
*/
if (attribute->atthasdef)
{
AttrDefault *attrdef;
int i;
def->raw_default = NULL;
def->cooked_default = NULL;
Assert(constr != NULL);
attrdef = constr->defval;
for (i = 0; i < constr->num_defval; i++)
{
if (attrdef[i].adnum == parent_attno)
{
/*
* if default expr could contain any vars, we'd
* need to fix 'em, but it can't ...
*/
def->cooked_default = pstrdup(attrdef[i].adbin);
break;
}
}
Assert(def->cooked_default != NULL);
}
}
/*
* Now copy the constraints of this parent, adjusting attnos using
* the completed newattno[] map
*/
if (constr && constr->num_check > 0)
{
ConstrCheck *check = constr->check;
int i;
for (i = 0; i < constr->num_check; i++)
{
Constraint *cdef = makeNode(Constraint);
Node *expr;
cdef->contype = CONSTR_CHECK;
if (check[i].ccname[0] == '$')
cdef->name = NULL;
else
cdef->name = pstrdup(check[i].ccname);
cdef->raw_expr = NULL;
/* adjust varattnos of ccbin here */
expr = stringToNode(check[i].ccbin);
change_varattnos_of_a_node(expr, newattno);
cdef->cooked_expr = nodeToString(expr);
constraints = lappend(constraints, cdef);
}
}
pfree(newattno);
/*
* Close the parent rel, but keep our AccessShareLock on it until
* xact commit. That will prevent someone else from deleting or
* ALTERing the parent before the child is committed.
*/
heap_close(relation, NoLock);
}
/*
* If we had no inherited attributes, the result schema is just the
* explicitly declared columns. Otherwise, we need to merge the
* declared columns into the inherited schema list.
*/
if (inhSchema != NIL)
{
foreach(entry, schema)
{
ColumnDef *newdef = lfirst(entry);
char *attributeName = newdef->colname;
char *attributeType = newdef->typename->name;
int exist_attno;
/*
* Does it conflict with some previously inherited column?
*/
exist_attno = findAttrByName(attributeName, inhSchema);
if (exist_attno > 0)
{
ColumnDef *def;
/*
* Yes, try to merge the two column definitions.
* They must have the same type and typmod.
*/
elog(NOTICE, "CREATE TABLE: merging attribute \"%s\" with inherited definition",
attributeName);
def = (ColumnDef *) nth(exist_attno - 1, inhSchema);
if (strcmp(def->typename->name, attributeType) != 0 ||
def->typename->typmod != newdef->typename->typmod)
elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
attributeName, def->typename->name, attributeType);
/* Merge of NOT NULL constraints = OR 'em together */
def->is_not_null |= newdef->is_not_null;
/* If new def has a default, override previous default */
if (newdef->raw_default != NULL)
{
def->raw_default = newdef->raw_default;
def->cooked_default = newdef->cooked_default;
}
}
else
{
/*
* No, attach new column to result schema
*/
inhSchema = lappend(inhSchema, newdef);
}
}
schema = inhSchema;
}
*supOids = parentOids;
*supconstr = constraints;
return schema;
}
/*
* complementary static functions for MergeAttributes().
* Varattnos of pg_relcheck.rcbin should be rewritten when
* subclasses inherit the constraints from the super class.
* Note that these functions rewrite varattnos while walking
* through a node tree.
*
* Varattnos of pg_relcheck.rcbin must be rewritten when subclasses inherit
* constraints from parent classes, since the inherited attributes could
* be given different column numbers in multiple-inheritance cases.
*
* Note that the passed node tree is modified in place!
*/
static bool
change_varattnos_walker(Node *node, const AttrNumber *newattno)
@ -250,7 +578,8 @@ change_varattnos_walker(Node *node, const AttrNumber *newattno)
{
Var *var = (Var *) node;
if (var->varlevelsup == 0 && var->varno == 1)
if (var->varlevelsup == 0 && var->varno == 1 &&
var->varattno > 0)
{
/*
@ -273,273 +602,6 @@ change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
return change_varattnos_walker(node, newattno);
}
/*
* MergeAttributes
* Returns new schema given initial schema and supers.
*
* Input arguments:
*
* 'schema' is the column/attribute definition for the table. (It's a list
* of ColumnDef's.) It is destructively changed.
* 'supers' is a list of names (as Value objects) of parent relations.
* 'istemp' is TRUE if we are creating a temp relation.
*
* Output arguments:
* 'supOids' receives an integer list of the OIDs of the parent relations.
* 'supconstr' receives a list of constraints belonging to the parents.
*
* Notes:
* The order in which the attributes are inherited is very important.
* Intuitively, the inherited attributes should come first. If a table
* inherits from multiple parents, the order of those attributes are
* according to the order of the parents specified in CREATE TABLE.
*
* Here's an example:
*
* create table person (name text, age int4, location point);
* create table emp (salary int4, manager text) inherits(person);
* create table student (gpa float8) inherits (person);
* create table stud_emp (percent int4) inherits (emp, student);
*
* the order of the attributes of stud_emp is as follow:
*
*
* person {1:name, 2:age, 3:location}
* / \
* {6:gpa} student emp {4:salary, 5:manager}
* \ /
* stud_emp {7:percent}
*/
static List *
MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr)
{
List *entry;
List *inhSchema = NIL;
List *parentOids = NIL;
List *constraints = NIL;
int attnums;
/*
* Validates that there are no duplications. Validity checking of
* types occurs later.
*/
foreach(entry, schema)
{
ColumnDef *coldef = lfirst(entry);
List *rest;
foreach(rest, lnext(entry))
{
/*
* check for duplicated names within the new relation
*/
ColumnDef *restdef = lfirst(rest);
if (strcmp(coldef->colname, restdef->colname) == 0)
{
elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated",
coldef->colname);
}
}
}
foreach(entry, supers)
{
List *rest;
foreach(rest, lnext(entry))
{
if (strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))) == 0)
{
elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated",
strVal(lfirst(entry)));
}
}
}
/*
* merge the inherited attributes into the schema
*/
attnums = 0;
foreach(entry, supers)
{
char *name = strVal(lfirst(entry));
Relation relation;
List *partialResult = NIL;
AttrNumber attrno;
TupleDesc tupleDesc;
TupleConstr *constr;
AttrNumber *newattno,
*partialAttidx;
Node *expr;
int i,
attidx,
attno_exist;
relation = heap_openr(name, AccessShareLock);
if (relation->rd_rel->relkind != RELKIND_RELATION)
elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name);
/* Permanent rels cannot inherit from temporary ones */
if (!istemp && is_temp_rel_name(name))
elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name);
/*
* We should have an UNDER permission flag for this, but for now,
* demand that creator of a child table own the parent.
*/
if (!pg_ownercheck(GetUserId(), name, RELNAME))
elog(ERROR, "you do not own table \"%s\"", name);
parentOids = lappendi(parentOids, relation->rd_id);
setRelhassubclassInRelation(relation->rd_id, true);
tupleDesc = RelationGetDescr(relation);
/* allocate a new attribute number table and initialize */
newattno = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
for (i = 0; i < tupleDesc->natts; i++)
newattno[i] = 0;
/*
* searching and storing order are different. another table is
* needed.
*/
partialAttidx = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber));
for (i = 0; i < tupleDesc->natts; i++)
partialAttidx[i] = 0;
constr = tupleDesc->constr;
attidx = 0;
for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--)
{
Form_pg_attribute attribute = tupleDesc->attrs[attrno];
char *attributeName;
char *attributeType;
HeapTuple tuple;
ColumnDef *def;
TypeName *typename;
/*
* form name, type and constraints
*/
attributeName = NameStr(attribute->attname);
tuple = SearchSysCache(TYPEOID,
ObjectIdGetDatum(attribute->atttypid),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "CREATE TABLE: cache lookup failed for type %u",
attribute->atttypid);
attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname));
ReleaseSysCache(tuple);
/*
* check validity
*
*/
if (checkAttrExists(attributeName, attributeType, schema) != 0)
elog(ERROR, "CREATE TABLE: attribute \"%s\" already exists in inherited schema",
attributeName);
if (0 < (attno_exist = checkAttrExists(attributeName, attributeType, inhSchema)))
{
/*
* this entry already exists
*/
newattno[attribute->attnum - 1] = attno_exist;
continue;
}
attidx++;
partialAttidx[attribute->attnum - 1] = attidx;
/*
* add an entry to the schema
*/
def = makeNode(ColumnDef);
typename = makeNode(TypeName);
def->colname = pstrdup(attributeName);
typename->name = pstrdup(attributeType);
typename->typmod = attribute->atttypmod;
def->typename = typename;
def->is_not_null = attribute->attnotnull;
def->raw_default = NULL;
def->cooked_default = NULL;
if (attribute->atthasdef)
{
AttrDefault *attrdef;
int i;
Assert(constr != NULL);
attrdef = constr->defval;
for (i = 0; i < constr->num_defval; i++)
{
if (attrdef[i].adnum == attrno + 1)
{
def->cooked_default = pstrdup(attrdef[i].adbin);
break;
}
}
Assert(def->cooked_default != NULL);
}
partialResult = lcons(def, partialResult);
}
for (i = 0; i < tupleDesc->natts; i++)
{
if (partialAttidx[i] > 0)
newattno[i] = attnums + attidx + 1 - partialAttidx[i];
}
attnums += attidx;
pfree(partialAttidx);
if (constr && constr->num_check > 0)
{
ConstrCheck *check = constr->check;
int i;
for (i = 0; i < constr->num_check; i++)
{
Constraint *cdef = makeNode(Constraint);
cdef->contype = CONSTR_CHECK;
if (check[i].ccname[0] == '$')
cdef->name = NULL;
else
cdef->name = pstrdup(check[i].ccname);
cdef->raw_expr = NULL;
/* adjust varattnos of ccbin here */
expr = stringToNode(check[i].ccbin);
change_varattnos_of_a_node(expr, newattno);
cdef->cooked_expr = nodeToString(expr);
constraints = lappend(constraints, cdef);
}
}
pfree(newattno);
/*
* Close the parent rel, but keep our AccessShareLock on it until
* xact commit. That will prevent someone else from deleting or
* ALTERing the parent before the child is committed.
*/
heap_close(relation, NoLock);
/*
* wants the inherited schema to appear in the order they are
* specified in CREATE TABLE
*/
inhSchema = nconc(inhSchema, partialResult);
}
/*
* put the inherited schema before our the schema for this table
*/
schema = nconc(inhSchema, schema);
*supOids = parentOids;
*supconstr = constraints;
return schema;
}
/*
* StoreCatalogInheritance
* Updates the system catalogs with proper inheritance information.
@ -716,15 +778,14 @@ again:
heap_close(relation, RowExclusiveLock);
}
/*
* returns the index (starting with 1) if attribute already exists in schema,
* Look for an existing schema entry with the given name.
*
* Returns the index (starting with 1) if attribute already exists in schema,
* 0 if it doesn't.
*/
static int
checkAttrExists(const char *attributeName, const char *attributeType,
List *schema)
findAttrByName(const char *attributeName, List *schema)
{
List *s;
int i = 0;
@ -735,21 +796,14 @@ checkAttrExists(const char *attributeName, const char *attributeType,
++i;
if (strcmp(attributeName, def->colname) == 0)
{
/*
* attribute exists. Make sure the types are the same.
*/
if (strcmp(attributeType, def->typename->name) != 0)
elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)",
attributeName, attributeType, def->typename->name);
return i;
}
}
return 0;
}
/*
* Update a relation's pg_class.relhassubclass entry to the given value
*/
static void
setRelhassubclassInRelation(Oid relationId, bool relhassubclass)
{

View File

@ -81,6 +81,9 @@ CREATE TABLE student (
CREATE TABLE stud_emp (
percent int4
) INHERITS (emp, student);
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "name"
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "age"
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "location"
CREATE TABLE city (
name name,
location box,
@ -132,6 +135,8 @@ CREATE TABLE c_star (
CREATE TABLE d_star (
d float8
) INHERITS (b_star, c_star);
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "class"
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "a"
CREATE TABLE e_star (
e int2
) INHERITS (c_star);

View File

@ -5,6 +5,8 @@ CREATE TABLE a (aa TEXT);
CREATE TABLE b (bb TEXT) INHERITS (a);
CREATE TABLE c (cc TEXT) INHERITS (a);
CREATE TABLE d (dd TEXT) INHERITS (b,c,a);
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "aa"
NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "aa"
INSERT INTO a(aa) VALUES('aaa');
INSERT INTO a(aa) VALUES('aaaa');
INSERT INTO a(aa) VALUES('aaaaa');