Fix handling of CREATE TABLE LIKE with inheritance.

If a CREATE TABLE command uses both LIKE and traditional inheritance,
Vars in CHECK constraints and expression indexes that are absorbed
from a LIKE parent table tended to get mis-numbered, resulting in
wrong answers and/or bizarre error messages (though probably not any
actual crashes, thanks to validation occurring in the executor).

In v12 and up, the same could happen to Vars in GENERATED expressions,
even in cases with no LIKE clause but multiple traditional-inheritance
parents.

The cause of the problem for LIKE is that parse_utilcmd.c supposed
it could renumber such Vars correctly during transformCreateStmt(),
which it cannot since we have not yet accounted for columns added via
inheritance.  Fix that by postponing processing of LIKE INCLUDING
CONSTRAINTS, DEFAULTS, GENERATED, INDEXES till after we've performed
DefineRelation().

The error with GENERATED and multiple inheritance is a simple oversight
in MergeAttributes(); it knows it has to renumber Vars in inherited
CHECK constraints, but forgot to apply the same processing to inherited
GENERATED expressions (a/k/a defaults).

Per bug #16272 from Tom Gottfried.  The non-GENERATED variants of the
issue are ancient, presumably dating right back to the addition of
CREATE TABLE LIKE; hence back-patch to all supported branches.

Discussion: https://postgr.es/m/16272-6e32da020e9a9381@postgresql.org
This commit is contained in:
Tom Lane 2020-08-21 15:00:42 -04:00
parent eabba4a3eb
commit 5028981923
9 changed files with 496 additions and 236 deletions

View File

@ -405,6 +405,8 @@ static bool ConstraintImpliedByRelConstraint(Relation scanrel,
List *testConstraint, List *provenConstraint);
static ObjectAddress ATExecColumnDefault(Relation rel, const char *colName,
Node *newDefault, LOCKMODE lockmode);
static ObjectAddress ATExecCookedColumnDefault(Relation rel, AttrNumber attnum,
Node *newDefault);
static ObjectAddress ATExecAddIdentity(Relation rel, const char *colName,
Node *def, LOCKMODE lockmode);
static ObjectAddress ATExecSetIdentity(Relation rel, const char *colName,
@ -2054,8 +2056,8 @@ storage_name(char c)
* 'schema' is the column/attribute definition for the table. (It's a list
* of ColumnDef's.) It is destructively changed.
* 'supers' is a list of OIDs of parent relations, already locked by caller.
* 'relpersistence' is a persistence type of the table.
* 'is_partition' tells if the table is a partition
* 'relpersistence' is the persistence type of the table.
* 'is_partition' tells if the table is a partition.
*
* Output arguments:
* 'supconstr' receives a list of constraints belonging to the parents,
@ -2218,7 +2220,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
TupleDesc tupleDesc;
TupleConstr *constr;
AttrMap *newattmap;
List *inherited_defaults;
List *cols_with_defaults;
AttrNumber parent_attno;
ListCell *lc1;
ListCell *lc2;
/* caller already got lock */
relation = table_open(parent, NoLock);
@ -2304,6 +2310,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
*/
newattmap = make_attrmap(tupleDesc->natts);
/* We can't process inherited defaults until newattmap is complete. */
inherited_defaults = cols_with_defaults = NIL;
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
@ -2359,7 +2368,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
get_collation_name(defCollId),
get_collation_name(attribute->attcollation))));
/* Copy storage parameter */
/* Copy/check storage parameter */
if (def->storage == 0)
def->storage = attribute->attstorage;
else if (def->storage != attribute->attstorage)
@ -2410,7 +2419,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
/*
* Copy default if any
* Locate default if any
*/
if (attribute->atthasdef)
{
@ -2432,23 +2441,59 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
Assert(this_default != NULL);
/*
* If default expr could contain any vars, we'd need to fix
* 'em, but it can't; so default is ready to apply to child.
*
* If we already had a default from some prior parent, check
* to see if they are the same. If so, no problem; if not,
* mark the column as having a bogus default. Below, we will
* complain if the bogus default isn't overridden by the child
* schema.
* If it's a GENERATED default, it might contain Vars that
* need to be mapped to the inherited column(s)' new numbers.
* We can't do that till newattmap is ready, so just remember
* all the inherited default expressions for the moment.
*/
Assert(def->raw_default == NULL);
if (def->cooked_default == NULL)
def->cooked_default = this_default;
else if (!equal(def->cooked_default, this_default))
{
def->cooked_default = &bogus_marker;
have_bogus_defaults = true;
}
inherited_defaults = lappend(inherited_defaults, this_default);
cols_with_defaults = lappend(cols_with_defaults, def);
}
}
/*
* Now process any inherited default expressions, adjusting attnos
* using the completed newattmap map.
*/
forboth(lc1, inherited_defaults, lc2, cols_with_defaults)
{
Node *this_default = (Node *) lfirst(lc1);
ColumnDef *def = (ColumnDef *) lfirst(lc2);
bool found_whole_row;
/* Adjust Vars to match new table's column numbering */
this_default = map_variable_attnos(this_default,
1, 0,
newattmap,
InvalidOid, &found_whole_row);
/*
* For the moment we have to reject whole-row variables. We could
* convert them, if we knew the new table's rowtype OID, but that
* hasn't been assigned yet. (A variable could only appear in a
* generation expression, so the error message is correct.)
*/
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert whole-row table reference"),
errdetail("Generation expression for column \"%s\" contains a whole-row reference to table \"%s\".",
def->colname,
RelationGetRelationName(relation))));
/*
* If we already had a default from some prior parent, check to
* see if they are the same. If so, no problem; if not, mark the
* column as having a bogus default. Below, we will complain if
* the bogus default isn't overridden by the child schema.
*/
Assert(def->raw_default == NULL);
if (def->cooked_default == NULL)
def->cooked_default = this_default;
else if (!equal(def->cooked_default, this_default))
{
def->cooked_default = &bogus_marker;
have_bogus_defaults = true;
}
}
@ -2667,7 +2712,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->raw_default = newdef->raw_default;
def->cooked_default = newdef->cooked_default;
}
}
else
{
@ -3781,6 +3825,7 @@ AlterTableGetLockLevel(List *cmds)
* Theoretically, these could be ShareRowExclusiveLock.
*/
case AT_ColumnDefault:
case AT_CookedColumnDefault:
case AT_AlterConstraint:
case AT_AddIndex: /* from ADD CONSTRAINT */
case AT_AddIndexConstraint:
@ -4040,6 +4085,13 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
/* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_OTHERCONSTR : AT_PASS_DROP;
break;
case AT_CookedColumnDefault: /* add a pre-cooked default */
/* This is currently used only in CREATE TABLE */
/* (so the permission check really isn't necessary) */
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* This command never recurses */
pass = AT_PASS_ADD_OTHERCONSTR;
break;
case AT_AddIdentity:
ATSimplePermissions(rel, ATT_TABLE | ATT_VIEW | ATT_FOREIGN_TABLE);
/* This command never recurses */
@ -4398,6 +4450,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
address = ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
break;
case AT_CookedColumnDefault: /* add a pre-cooked default */
address = ATExecCookedColumnDefault(rel, cmd->num, cmd->def);
break;
case AT_AddIdentity:
cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode,
cur_pass, context);
@ -6859,6 +6914,35 @@ ATExecColumnDefault(Relation rel, const char *colName,
return address;
}
/*
* Add a pre-cooked default expression.
*
* Return the address of the affected column.
*/
static ObjectAddress
ATExecCookedColumnDefault(Relation rel, AttrNumber attnum,
Node *newDefault)
{
ObjectAddress address;
/* We assume no checking is required */
/*
* Remove any old default for the column. We use RESTRICT here for
* safety, but at present we do not expect anything to depend on the
* default. (In ordinary cases, there could not be a default in place
* anyway, but it's possible when combining LIKE with inheritance.)
*/
RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false,
true);
(void) StoreAttrDefault(rel, attnum, newDefault, true, false);
ObjectAddressSubSet(address, RelationRelationId,
RelationGetRelid(rel), attnum);
return address;
}
/*
* ALTER TABLE ALTER COLUMN ADD IDENTITY
*

View File

@ -86,7 +86,6 @@ typedef struct
List *ckconstraints; /* CHECK constraints */
List *fkconstraints; /* FOREIGN KEY constraints */
List *ixconstraints; /* index-creating constraints */
List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
List *extstats; /* cloned extended statistics */
List *blist; /* "before list" of things to do before
* creating the table */
@ -154,6 +153,9 @@ static Const *transformPartitionBoundValue(ParseState *pstate, Node *con,
* Returns a List of utility commands to be done in sequence. One of these
* will be the transformed CreateStmt, but there may be additional actions
* to be done before and after the actual DefineRelation() call.
* In addition to normal utility commands such as AlterTableStmt and
* IndexStmt, the result list may contain TableLikeClause(s), representing
* the need to perform additional parse analysis after DefineRelation().
*
* SQL allows constraints to be scattered all over, so thumb through
* the columns and collect all constraints into one place.
@ -241,7 +243,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.extstats = NIL;
cxt.blist = NIL;
cxt.alist = NIL;
@ -917,18 +918,18 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
* transformTableLikeClause
*
* Change the LIKE <srctable> portion of a CREATE TABLE statement into
* column definitions which recreate the user defined column portions of
* <srctable>.
* column definitions that recreate the user defined column portions of
* <srctable>. Also, if there are any LIKE options that we can't fully
* process at this point, add the TableLikeClause to cxt->alist, which
* will cause utility.c to call expandTableLikeClause() after the new
* table has been created.
*/
static void
transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause)
{
AttrNumber parent_attno;
AttrNumber new_attno;
Relation relation;
TupleDesc tupleDesc;
TupleConstr *constr;
AttrMap *attmap;
AclResult aclresult;
char *comment;
ParseCallbackState pcbstate;
@ -942,6 +943,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("LIKE is not supported for creating foreign tables")));
/* Open the relation referenced by the LIKE clause */
relation = relation_openrv(table_like_clause->relation, AccessShareLock);
if (relation->rd_rel->relkind != RELKIND_RELATION &&
@ -978,37 +980,11 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
}
tupleDesc = RelationGetDescr(relation);
constr = tupleDesc->constr;
/*
* Initialize column number map for map_variable_attnos(). We need this
* since dropped columns in the source table aren't copied, so the new
* table can have different column numbers.
*/
attmap = make_attrmap(tupleDesc->natts);
/*
* We must fill the attmap now so that it can be used to process generated
* column default expressions in the per-column loop below.
*/
new_attno = 1;
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
Form_pg_attribute attribute = TupleDescAttr(tupleDesc,
parent_attno - 1);
/*
* Ignore dropped columns in the parent. attmap entry is left zero.
*/
if (attribute->attisdropped)
continue;
attmap->attnums[parent_attno - 1] = list_length(cxt->columns) + (new_attno++);
}
/*
* Insert the copied attributes into the cxt for the new table definition.
* We must do this now so that they appear in the table in the relative
* position where the LIKE clause is, as required by SQL99.
*/
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
@ -1052,52 +1028,12 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
cxt->columns = lappend(cxt->columns, def);
/*
* Copy default, if present and it should be copied. We have separate
* options for plain default expressions and GENERATED defaults.
* Although we don't transfer the column's default/generation
* expression now, we need to mark it GENERATED if appropriate.
*/
if (attribute->atthasdef &&
(attribute->attgenerated ?
(table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) :
(table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
{
Node *this_default = NULL;
AttrDefault *attrdef;
int i;
bool found_whole_row;
/* Find default in constraint structure */
Assert(constr != NULL);
attrdef = constr->defval;
for (i = 0; i < constr->num_defval; i++)
{
if (attrdef[i].adnum == parent_attno)
{
this_default = stringToNode(attrdef[i].adbin);
break;
}
}
Assert(this_default != NULL);
def->cooked_default = map_variable_attnos(this_default,
1, 0,
attmap,
InvalidOid, &found_whole_row);
/*
* Prevent this for the same reason as for constraints below. Note
* that defaults cannot contain any vars, so it's OK that the
* error message refers to generated columns.
*/
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert whole-row table reference"),
errdetail("Generation expression for column \"%s\" contains a whole-row reference to table \"%s\".",
attributeName,
RelationGetRelationName(relation))));
if (attribute->atthasdef && attribute->attgenerated &&
(table_like_clause->options & CREATE_TABLE_LIKE_GENERATED))
def->generated = attribute->attgenerated;
}
/*
* Copy identity if requested
@ -1146,117 +1082,22 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
}
/*
* Copy CHECK constraints if requested, being careful to adjust attribute
* numbers so they match the child.
* We cannot yet deal with defaults, CHECK constraints, or indexes, since
* we don't yet know what column numbers the copied columns will have in
* the finished table. If any of those options are specified, add the
* LIKE clause to cxt->alist so that expandTableLikeClause will be called
* after we do know that.
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
tupleDesc->constr)
{
TupleConstr *constr = tupleDesc->constr;
int ccnum;
for (ccnum = 0; ccnum < constr->num_check; ccnum++)
{
char *ccname = constr->check[ccnum].ccname;
char *ccbin = constr->check[ccnum].ccbin;
bool ccnoinherit = constr->check[ccnum].ccnoinherit;
Constraint *n = makeNode(Constraint);
Node *ccbin_node;
bool found_whole_row;
ccbin_node = map_variable_attnos(stringToNode(ccbin),
1, 0,
attmap,
InvalidOid, &found_whole_row);
/*
* We reject whole-row variables because the whole point of LIKE
* is that the new table's rowtype might later diverge from the
* parent's. So, while translation might be possible right now,
* it wouldn't be possible to guarantee it would work in future.
*/
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert whole-row table reference"),
errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
ccname,
RelationGetRelationName(relation))));
n->contype = CONSTR_CHECK;
n->conname = pstrdup(ccname);
n->location = -1;
n->is_no_inherit = ccnoinherit;
n->raw_expr = NULL;
n->cooked_expr = nodeToString(ccbin_node);
cxt->ckconstraints = lappend(cxt->ckconstraints, n);
/* Copy comment on constraint */
if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
(comment = GetComment(get_relation_constraint_oid(RelationGetRelid(relation),
n->conname, false),
ConstraintRelationId,
0)) != NULL)
{
CommentStmt *stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_TABCONSTRAINT;
stmt->object = (Node *) list_make3(makeString(cxt->relation->schemaname),
makeString(cxt->relation->relname),
makeString(n->conname));
stmt->comment = comment;
cxt->alist = lappend(cxt->alist, stmt);
}
}
}
if (table_like_clause->options &
(CREATE_TABLE_LIKE_DEFAULTS |
CREATE_TABLE_LIKE_GENERATED |
CREATE_TABLE_LIKE_CONSTRAINTS |
CREATE_TABLE_LIKE_INDEXES))
cxt->alist = lappend(cxt->alist, table_like_clause);
/*
* Likewise, copy indexes if requested
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
relation->rd_rel->relhasindex)
{
List *parent_indexes;
ListCell *l;
parent_indexes = RelationGetIndexList(relation);
foreach(l, parent_indexes)
{
Oid parent_index_oid = lfirst_oid(l);
Relation parent_index;
IndexStmt *index_stmt;
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
index_stmt = generateClonedIndexStmt(cxt->relation,
parent_index,
attmap,
NULL);
/* Copy comment on index, if requested */
if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
{
comment = GetComment(parent_index_oid, RelationRelationId, 0);
/*
* We make use of IndexStmt's idxcomment option, so as not to
* need to know now what name the index will have.
*/
index_stmt->idxcomment = comment;
}
/* Save it in the inh_indexes list for the time being */
cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
index_close(parent_index, AccessShareLock);
}
}
/*
* Likewise, copy extended statistics if requested
* We may copy extended statistics if requested, since the representation
* of CreateStatsStmt doesn't depend on column numbers.
*/
if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS)
{
@ -1292,12 +1133,277 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
list_free(parent_extstats);
}
/*
* Close the parent rel, but keep our AccessShareLock on it until xact
* commit. That will prevent someone else from deleting or ALTERing the
* parent before we can run expandTableLikeClause.
*/
table_close(relation, NoLock);
}
/*
* expandTableLikeClause
*
* Process LIKE options that require knowing the final column numbers
* assigned to the new table's columns. This executes after we have
* run DefineRelation for the new table. It returns a list of utility
* commands that should be run to generate indexes etc.
*/
List *
expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
{
List *result = NIL;
List *atsubcmds = NIL;
AttrNumber parent_attno;
Relation relation;
Relation childrel;
TupleDesc tupleDesc;
TupleConstr *constr;
AttrMap *attmap;
char *comment;
/*
* Open the relation referenced by the LIKE clause. We should still have
* the table lock obtained by transformTableLikeClause (and this'll throw
* an assertion failure if not). Hence, no need to recheck privileges
* etc.
*/
relation = relation_openrv(table_like_clause->relation, NoLock);
tupleDesc = RelationGetDescr(relation);
constr = tupleDesc->constr;
/*
* Open the newly-created child relation; we have lock on that too.
*/
childrel = relation_openrv(heapRel, NoLock);
/*
* Construct a map from the LIKE relation's attnos to the child rel's.
* This re-checks type match etc, although it shouldn't be possible to
* have a failure since both tables are locked.
*/
attmap = build_attrmap_by_name(RelationGetDescr(childrel),
tupleDesc);
/*
* Process defaults, if required.
*/
if ((table_like_clause->options &
(CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) &&
constr != NULL)
{
AttrDefault *attrdef = constr->defval;
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
Form_pg_attribute attribute = TupleDescAttr(tupleDesc,
parent_attno - 1);
/*
* Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
continue;
/*
* Copy default, if present and it should be copied. We have
* separate options for plain default expressions and GENERATED
* defaults.
*/
if (attribute->atthasdef &&
(attribute->attgenerated ?
(table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) :
(table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
{
Node *this_default = NULL;
AlterTableCmd *atsubcmd;
bool found_whole_row;
/* Find default in constraint structure */
for (int i = 0; i < constr->num_defval; i++)
{
if (attrdef[i].adnum == parent_attno)
{
this_default = stringToNode(attrdef[i].adbin);
break;
}
}
Assert(this_default != NULL);
atsubcmd = makeNode(AlterTableCmd);
atsubcmd->subtype = AT_CookedColumnDefault;
atsubcmd->num = attmap->attnums[parent_attno - 1];
atsubcmd->def = map_variable_attnos(this_default,
1, 0,
attmap,
InvalidOid,
&found_whole_row);
/*
* Prevent this for the same reason as for constraints below.
* Note that defaults cannot contain any vars, so it's OK that
* the error message refers to generated columns.
*/
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert whole-row table reference"),
errdetail("Generation expression for column \"%s\" contains a whole-row reference to table \"%s\".",
NameStr(attribute->attname),
RelationGetRelationName(relation))));
atsubcmds = lappend(atsubcmds, atsubcmd);
}
}
}
/*
* Copy CHECK constraints if requested, being careful to adjust attribute
* numbers so they match the child.
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
constr != NULL)
{
int ccnum;
for (ccnum = 0; ccnum < constr->num_check; ccnum++)
{
char *ccname = constr->check[ccnum].ccname;
char *ccbin = constr->check[ccnum].ccbin;
bool ccnoinherit = constr->check[ccnum].ccnoinherit;
Node *ccbin_node;
bool found_whole_row;
Constraint *n;
AlterTableCmd *atsubcmd;
ccbin_node = map_variable_attnos(stringToNode(ccbin),
1, 0,
attmap,
InvalidOid, &found_whole_row);
/*
* We reject whole-row variables because the whole point of LIKE
* is that the new table's rowtype might later diverge from the
* parent's. So, while translation might be possible right now,
* it wouldn't be possible to guarantee it would work in future.
*/
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot convert whole-row table reference"),
errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
ccname,
RelationGetRelationName(relation))));
n = makeNode(Constraint);
n->contype = CONSTR_CHECK;
n->conname = pstrdup(ccname);
n->location = -1;
n->is_no_inherit = ccnoinherit;
n->raw_expr = NULL;
n->cooked_expr = nodeToString(ccbin_node);
/* We can skip validation, since the new table should be empty. */
n->skip_validation = true;
n->initially_valid = true;
atsubcmd = makeNode(AlterTableCmd);
atsubcmd->subtype = AT_AddConstraint;
atsubcmd->def = (Node *) n;
atsubcmds = lappend(atsubcmds, atsubcmd);
/* Copy comment on constraint */
if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
(comment = GetComment(get_relation_constraint_oid(RelationGetRelid(relation),
n->conname, false),
ConstraintRelationId,
0)) != NULL)
{
CommentStmt *stmt = makeNode(CommentStmt);
stmt->objtype = OBJECT_TABCONSTRAINT;
stmt->object = (Node *) list_make3(makeString(heapRel->schemaname),
makeString(heapRel->relname),
makeString(n->conname));
stmt->comment = comment;
result = lappend(result, stmt);
}
}
}
/*
* If we generated any ALTER TABLE actions above, wrap them into a single
* ALTER TABLE command. Stick it at the front of the result, so it runs
* before any CommentStmts we made above.
*/
if (atsubcmds)
{
AlterTableStmt *atcmd = makeNode(AlterTableStmt);
atcmd->relation = copyObject(heapRel);
atcmd->cmds = atsubcmds;
atcmd->objtype = OBJECT_TABLE;
atcmd->missing_ok = false;
result = lcons(atcmd, result);
}
/*
* Process indexes if required.
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
relation->rd_rel->relhasindex)
{
List *parent_indexes;
ListCell *l;
parent_indexes = RelationGetIndexList(relation);
foreach(l, parent_indexes)
{
Oid parent_index_oid = lfirst_oid(l);
Relation parent_index;
IndexStmt *index_stmt;
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
index_stmt = generateClonedIndexStmt(heapRel,
parent_index,
attmap,
NULL);
/* Copy comment on index, if requested */
if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
{
comment = GetComment(parent_index_oid, RelationRelationId, 0);
/*
* We make use of IndexStmt's idxcomment option, so as not to
* need to know now what name the index will have.
*/
index_stmt->idxcomment = comment;
}
result = lappend(result, index_stmt);
index_close(parent_index, AccessShareLock);
}
}
/* Done with child rel */
table_close(childrel, NoLock);
/*
* Close the parent rel, but keep our AccessShareLock on it until xact
* commit. That will prevent someone else from deleting or ALTERing the
* parent before the child is committed.
*/
table_close(relation, NoLock);
return result;
}
static void
@ -1590,7 +1696,7 @@ generateClonedIndexStmt(RangeVar *heapRel, Relation source_idx,
attmap,
InvalidOid, &found_whole_row);
/* As in transformTableLikeClause, reject whole-row variables */
/* As in expandTableLikeClause, reject whole-row variables */
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1699,7 +1805,7 @@ generateClonedIndexStmt(RangeVar *heapRel, Relation source_idx,
attmap,
InvalidOid, &found_whole_row);
/* As in transformTableLikeClause, reject whole-row variables */
/* As in expandTableLikeClause, reject whole-row variables */
if (found_whole_row)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1897,24 +2003,6 @@ transformIndexConstraints(CreateStmtContext *cxt)
indexlist = lappend(indexlist, index);
}
/* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
foreach(lc, cxt->inh_indexes)
{
index = (IndexStmt *) lfirst(lc);
if (index->primary)
{
if (cxt->pkey != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->pkey = index;
}
indexlist = lappend(indexlist, index);
}
/*
* Scan the index list and remove any redundant index specifications. This
* can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A
@ -3115,7 +3203,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
cxt.inh_indexes = NIL;
cxt.extstats = NIL;
cxt.blist = NIL;
cxt.alist = NIL;

View File

@ -1197,6 +1197,28 @@ ProcessUtilitySlow(ParseState *pstate,
secondaryObject,
stmt);
}
else if (IsA(stmt, TableLikeClause))
{
/*
* Do delayed processing of LIKE options. This
* will result in additional sub-statements for us
* to process. We can just tack those onto the
* to-do list.
*/
TableLikeClause *like = (TableLikeClause *) stmt;
RangeVar *rv = ((CreateStmt *) parsetree)->relation;
List *morestmts;
morestmts = expandTableLikeClause(rv, like);
stmts = list_concat(stmts, morestmts);
/*
* We don't need a CCI now, besides which the "l"
* list pointer is now possibly invalid, so just
* skip the CCI test below.
*/
continue;
}
else
{
/*
@ -1405,6 +1427,7 @@ ProcessUtilitySlow(ParseState *pstate,
IndexStmt *stmt = (IndexStmt *) parsetree;
Oid relid;
LOCKMODE lockmode;
bool is_alter_table;
if (stmt->concurrent)
PreventInTransactionBlock(isTopLevel,
@ -1466,6 +1489,17 @@ ProcessUtilitySlow(ParseState *pstate,
list_free(inheritors);
}
/*
* If the IndexStmt is already transformed, it must have
* come from generateClonedIndexStmt, which in current
* usage means it came from expandTableLikeClause rather
* than from original parse analysis. And that means we
* must treat it like ALTER TABLE ADD INDEX, not CREATE.
* (This is a bit grotty, but currently it doesn't seem
* worth adding a separate bool field for the purpose.)
*/
is_alter_table = stmt->transformed;
/* Run parse analysis ... */
stmt = transformIndexStmt(relid, stmt, queryString);
@ -1477,7 +1511,7 @@ ProcessUtilitySlow(ParseState *pstate,
InvalidOid, /* no predefined OID */
InvalidOid, /* no parent index */
InvalidOid, /* no parent constraint */
false, /* is_alter_table */
is_alter_table,
true, /* check_rights */
true, /* check_not_in_use */
false, /* skip_build */

View File

@ -1786,6 +1786,7 @@ typedef enum AlterTableType
AT_AddColumnRecurse, /* internal to commands/tablecmds.c */
AT_AddColumnToView, /* implicitly via CREATE OR REPLACE VIEW */
AT_ColumnDefault, /* alter column default */
AT_CookedColumnDefault, /* add a pre-cooked column default */
AT_DropNotNull, /* alter column drop not null */
AT_SetNotNull, /* alter column set not null */
AT_DropExpression, /* alter column drop expression */

View File

@ -31,6 +31,8 @@ extern void transformRuleStmt(RuleStmt *stmt, const char *queryString,
extern List *transformCreateSchemaStmt(CreateSchemaStmt *stmt);
extern PartitionBoundSpec *transformPartitionBound(ParseState *pstate, Relation parent,
PartitionBoundSpec *spec);
extern List *expandTableLikeClause(RangeVar *heapRel,
TableLikeClause *table_like_clause);
extern IndexStmt *generateClonedIndexStmt(RangeVar *heapRel,
Relation source_idx,
const struct AttrMap *attmap,

View File

@ -135,6 +135,8 @@ CREATE TABLE like_fkey_table (
INCLUDING STORAGE
);
NOTICE: DDL test: type simple, tag CREATE TABLE
NOTICE: DDL test: type alter table, tag ALTER TABLE
NOTICE: subcommand: ALTER COLUMN SET DEFAULT (precooked)
NOTICE: DDL test: type simple, tag CREATE INDEX
NOTICE: DDL test: type simple, tag CREATE INDEX
-- Volatile table types

View File

@ -111,6 +111,9 @@ get_altertable_subcmdtypes(PG_FUNCTION_ARGS)
case AT_ColumnDefault:
strtype = "ALTER COLUMN SET DEFAULT";
break;
case AT_CookedColumnDefault:
strtype = "ALTER COLUMN SET DEFAULT (precooked)";
break;
case AT_DropNotNull:
strtype = "DROP NOT NULL";
break;

View File

@ -160,7 +160,9 @@ SELECT * FROM test_like_gen_3;
DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;
-- also test generated column with a "forward" reference (bug #16342)
CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
CREATE TABLE test_like_4 (b int DEFAULT 42,
c int GENERATED ALWAYS AS (a * 2) STORED,
a int CHECK (a > 0));
\d test_like_4
Table "public.test_like_4"
Column | Type | Collation | Nullable | Default
@ -168,6 +170,8 @@ CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) ST
b | integer | | | 42
c | integer | | | generated always as (a * 2) stored
a | integer | | |
Check constraints:
"test_like_4_a_check" CHECK (a > 0)
CREATE TABLE test_like_4a (LIKE test_like_4);
CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@ -233,7 +237,32 @@ SELECT a, b, c FROM test_like_4d;
11 | 42 | 22
(1 row)
-- Test renumbering of Vars when combining LIKE with inheritance
CREATE TABLE test_like_5 (x point, y point, z point);
CREATE TABLE test_like_5x (p int CHECK (p > 0),
q int GENERATED ALWAYS AS (p * 2) STORED);
CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
INHERITS (test_like_5, test_like_5x);
\d test_like_5c
Table "public.test_like_5c"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+------------------------------------
x | point | | |
y | point | | |
z | point | | |
p | integer | | |
q | integer | | | generated always as (p * 2) stored
b | integer | | | 42
c | integer | | | generated always as (a * 2) stored
a | integer | | |
Check constraints:
"test_like_4_a_check" CHECK (a > 0)
"test_like_5x_p_check" CHECK (p > 0)
Inherits: test_like_5,
test_like_5x
DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
DROP TABLE test_like_5, test_like_5x, test_like_5c;
CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
INSERT INTO inhg VALUES (5, 10);
INSERT INTO inhg VALUES (20, 10); -- should fail
@ -269,9 +298,10 @@ ALTER TABLE ctlt1 ALTER COLUMN a SET STORAGE MAIN;
CREATE TABLE ctlt2 (c text);
ALTER TABLE ctlt2 ALTER COLUMN c SET STORAGE EXTERNAL;
COMMENT ON COLUMN ctlt2.c IS 'C';
CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text);
CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text CHECK (length(c) < 7));
ALTER TABLE ctlt3 ALTER COLUMN c SET STORAGE EXTERNAL;
ALTER TABLE ctlt3 ALTER COLUMN a SET STORAGE MAIN;
CREATE INDEX ctlt3_fnidx ON ctlt3 ((a || c));
COMMENT ON COLUMN ctlt3.a IS 'A3';
COMMENT ON COLUMN ctlt3.c IS 'C';
COMMENT ON CONSTRAINT ctlt3_a_check ON ctlt3 IS 't3_a_check';
@ -327,10 +357,11 @@ NOTICE: merging multiple inherited definitions of column "a"
Check constraints:
"ctlt1_a_check" CHECK (length(a) > 2)
"ctlt3_a_check" CHECK (length(a) < 5)
"ctlt3_c_check" CHECK (length(c) < 7)
Inherits: ctlt1,
ctlt3
CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING INDEXES INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
NOTICE: merging column "a" with inherited definition
\d+ ctlt13_like
Table "public.ctlt13_like"
@ -339,9 +370,12 @@ NOTICE: merging column "a" with inherited definition
a | text | | not null | | main | | A3
b | text | | | | extended | |
c | text | | | | external | | C
Indexes:
"ctlt13_like_expr_idx" btree ((a || c))
Check constraints:
"ctlt1_a_check" CHECK (length(a) > 2)
"ctlt3_a_check" CHECK (length(a) < 5)
"ctlt3_c_check" CHECK (length(c) < 7)
Inherits: ctlt1
SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid AND c.conrelid = 'ctlt13_like'::regclass;

View File

@ -66,7 +66,9 @@ SELECT * FROM test_like_gen_3;
DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;
-- also test generated column with a "forward" reference (bug #16342)
CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
CREATE TABLE test_like_4 (b int DEFAULT 42,
c int GENERATED ALWAYS AS (a * 2) STORED,
a int CHECK (a > 0));
\d test_like_4
CREATE TABLE test_like_4a (LIKE test_like_4);
CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@ -84,7 +86,17 @@ SELECT a, b, c FROM test_like_4c;
\d test_like_4d
INSERT INTO test_like_4d (a) VALUES(11);
SELECT a, b, c FROM test_like_4d;
-- Test renumbering of Vars when combining LIKE with inheritance
CREATE TABLE test_like_5 (x point, y point, z point);
CREATE TABLE test_like_5x (p int CHECK (p > 0),
q int GENERATED ALWAYS AS (p * 2) STORED);
CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
INHERITS (test_like_5, test_like_5x);
\d test_like_5c
DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
DROP TABLE test_like_5, test_like_5x, test_like_5c;
CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
INSERT INTO inhg VALUES (5, 10);
@ -119,9 +131,10 @@ CREATE TABLE ctlt2 (c text);
ALTER TABLE ctlt2 ALTER COLUMN c SET STORAGE EXTERNAL;
COMMENT ON COLUMN ctlt2.c IS 'C';
CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text);
CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text CHECK (length(c) < 7));
ALTER TABLE ctlt3 ALTER COLUMN c SET STORAGE EXTERNAL;
ALTER TABLE ctlt3 ALTER COLUMN a SET STORAGE MAIN;
CREATE INDEX ctlt3_fnidx ON ctlt3 ((a || c));
COMMENT ON COLUMN ctlt3.a IS 'A3';
COMMENT ON COLUMN ctlt3.c IS 'C';
COMMENT ON CONSTRAINT ctlt3_a_check ON ctlt3 IS 't3_a_check';
@ -138,7 +151,7 @@ CREATE TABLE ctlt1_inh (LIKE ctlt1 INCLUDING CONSTRAINTS INCLUDING COMMENTS) INH
SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid AND c.conrelid = 'ctlt1_inh'::regclass;
CREATE TABLE ctlt13_inh () INHERITS (ctlt1, ctlt3);
\d+ ctlt13_inh
CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING INDEXES INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
\d+ ctlt13_like
SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid AND c.conrelid = 'ctlt13_like'::regclass;