diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 70753e33e4..d69809a2f8 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -68,6 +68,7 @@ #include "parser/parser.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" +#include "rewrite/rewriteManip.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/lock.h" @@ -253,7 +254,6 @@ static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, char relpersistence, List **supOids, List **supconstr, int *supOidCount); static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); -static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); static void StoreCatalogInheritance(Oid relationId, List *supers); @@ -1496,7 +1496,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, * parents after the first one, nor if we have dropped columns.) */ newattno = (AttrNumber *) - palloc(tupleDesc->natts * sizeof(AttrNumber)); + palloc0(tupleDesc->natts * sizeof(AttrNumber)); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -1510,15 +1510,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, * Ignore dropped columns in the parent. */ if (attribute->attisdropped) - { - /* - * change_varattnos_of_a_node asserts that this is greater - * than zero, so if anything tries to use it, we should find - * out. - */ - newattno[parent_attno - 1] = 0; - continue; - } + continue; /* leave newattno entry as zero */ /* * Does it conflict with some previously inherited column? @@ -1656,14 +1648,30 @@ MergeAttributes(List *schema, List *supers, char relpersistence, { char *name = check[i].ccname; Node *expr; + bool found_whole_row; /* ignore if the constraint is non-inheritable */ if (check[i].ccnoinherit) continue; - /* adjust varattnos of ccbin here */ - expr = stringToNode(check[i].ccbin); - change_varattnos_of_a_node(expr, newattno); + /* Adjust Vars to match new table's column numbering */ + expr = map_variable_attnos(stringToNode(check[i].ccbin), + 1, 0, + newattno, tupleDesc->natts, + &found_whole_row); + + /* + * For the moment we have to reject whole-row variables. + * We could convert them, if we knew the new table's rowtype + * OID, but that hasn't been assigned yet. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + name, + RelationGetRelationName(relation)))); /* check for duplicate */ if (!MergeCheckConstraint(constraints, name, expr)) @@ -1866,101 +1874,6 @@ MergeCheckConstraint(List *constraints, char *name, Node *expr) } -/* - * Replace varattno values in an expression tree according to the given - * map array, that is, varattno N is replaced by newattno[N-1]. It is - * caller's responsibility to ensure that the array is long enough to - * define values for all user varattnos present in the tree. System column - * attnos remain unchanged. - * - * Note that the passed node tree is modified in-place! - */ -void -change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) -{ - /* no setup needed, so away we go */ - (void) change_varattnos_walker(node, newattno); -} - -static bool -change_varattnos_walker(Node *node, const AttrNumber *newattno) -{ - if (node == NULL) - return false; - if (IsA(node, Var)) - { - Var *var = (Var *) node; - - if (var->varlevelsup == 0 && var->varno == 1 && - var->varattno > 0) - { - /* - * ??? the following may be a problem when the node is multiply - * referenced though stringToNode() doesn't create such a node - * currently. - */ - Assert(newattno[var->varattno - 1] > 0); - var->varattno = var->varoattno = newattno[var->varattno - 1]; - } - return false; - } - return expression_tree_walker(node, change_varattnos_walker, - (void *) newattno); -} - -/* - * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's, - * matching according to column name. - */ -AttrNumber * -varattnos_map(TupleDesc olddesc, TupleDesc newdesc) -{ - AttrNumber *attmap; - int i, - j; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * olddesc->natts); - for (i = 1; i <= olddesc->natts; i++) - { - if (olddesc->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - for (j = 1; j <= newdesc->natts; j++) - { - if (strcmp(NameStr(olddesc->attrs[i - 1]->attname), - NameStr(newdesc->attrs[j - 1]->attname)) == 0) - { - attmap[i - 1] = j; - break; - } - } - } - return attmap; -} - -/* - * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list - * of ColumnDefs - */ -AttrNumber * -varattnos_map_schema(TupleDesc old, List *schema) -{ - AttrNumber *attmap; - int i; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); - for (i = 1; i <= old->natts; i++) - { - if (old->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname), - schema); - } - return attmap; -} - - /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index adc0d5b775..55a74eeaf4 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -108,7 +108,8 @@ static void transformOfType(CreateStmtContext *cxt, TypeName *ofTypename); static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, - Relation parent_index, AttrNumber *attmap); + Relation source_idx, + const AttrNumber *attmap, int attmap_length); static List *get_collation(Oid collation, Oid actual_datatype); static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(CreateStmtContext *cxt); @@ -634,6 +635,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla Relation relation; TupleDesc tupleDesc; TupleConstr *constr; + AttrNumber *attmap; AclResult aclresult; char *comment; ParseCallbackState pcbstate; @@ -642,14 +644,14 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla relation = relation_openrv(table_like_clause->relation, AccessShareLock); - if (relation->rd_rel->relkind != RELKIND_RELATION - && relation->rd_rel->relkind != RELKIND_VIEW - && relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE - && relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE) + if (relation->rd_rel->relkind != RELKIND_RELATION && + relation->rd_rel->relkind != RELKIND_VIEW && + relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE && + relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table, view, composite type, or foreign table", - table_like_clause->relation->relname))); + RelationGetRelationName(relation)))); cancel_parser_errposition_callback(&pcbstate); @@ -676,6 +678,13 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla tupleDesc = RelationGetDescr(relation); constr = tupleDesc->constr; + /* + * Initialize column number map for map_variable_attnos(). We need this + * since dropped columns in the source table aren't copied, so the new + * table can have different column numbers. + */ + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts); + /* * Insert the copied attributes into the cxt for the new table definition. */ @@ -687,7 +696,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla ColumnDef *def; /* - * Ignore dropped columns in the parent. + * Ignore dropped columns in the parent. attmap entry is left zero. */ if (attribute->attisdropped) continue; @@ -718,6 +727,8 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla */ cxt->columns = lappend(cxt->columns, def); + attmap[parent_attno - 1] = list_length(cxt->columns); + /* * Copy default, if present and the default has been requested */ @@ -776,22 +787,39 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla /* * Copy CHECK constraints if requested, being careful to adjust attribute - * numbers + * numbers so they match the child. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) && tupleDesc->constr) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); int ccnum; for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) { char *ccname = tupleDesc->constr->check[ccnum].ccname; char *ccbin = tupleDesc->constr->check[ccnum].ccbin; - Node *ccbin_node = stringToNode(ccbin); Constraint *n = makeNode(Constraint); + Node *ccbin_node; + bool found_whole_row; - change_varattnos_of_a_node(ccbin_node, attmap); + ccbin_node = map_variable_attnos(stringToNode(ccbin), + 1, 0, + attmap, tupleDesc->natts, + &found_whole_row); + + /* + * We reject whole-row variables because the whole point of LIKE + * is that the new table's rowtype might later diverge from the + * parent's. So, while translation might be possible right now, + * it wouldn't be possible to guarantee it would work in future. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + ccname, + RelationGetRelationName(relation)))); n->contype = CONSTR_CHECK; n->location = -1; @@ -827,7 +855,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) && relation->rd_rel->relhasindex) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; @@ -842,7 +869,8 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap, tupleDesc->natts); /* Copy comment on index */ if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) @@ -961,7 +989,7 @@ chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt) */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, - AttrNumber *attmap) + const AttrNumber *attmap, int attmap_length) { Oid source_relid = RelationGetRelid(source_idx); Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs; @@ -1149,14 +1177,26 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, { /* Expressional index */ Node *indexkey; + bool found_whole_row; if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); indexpr_item = lnext(indexpr_item); - /* OK to modify indexkey since we are working on a private copy */ - change_varattnos_of_a_node(indexkey, attmap); + /* Adjust Vars to match new table's column numbering */ + indexkey = map_variable_attnos(indexkey, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); iparam->name = NULL; iparam->expr = indexkey; @@ -1213,12 +1253,28 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, if (!isnull) { char *pred_str; + Node *pred_tree; + bool found_whole_row; /* Convert text string to node tree */ pred_str = TextDatumGetCString(datum); - index->whereClause = (Node *) stringToNode(pred_str); - /* Adjust attribute numbers */ - change_varattnos_of_a_node(index->whereClause, attmap); + pred_tree = (Node *) stringToNode(pred_str); + + /* Adjust Vars to match new table's column numbering */ + pred_tree = map_variable_attnos(pred_tree, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); + + index->whereClause = pred_tree; } /* Clean up */ diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c index ec6b5bf5a9..9c778efd1c 100644 --- a/src/backend/rewrite/rewriteManip.c +++ b/src/backend/rewrite/rewriteManip.c @@ -1217,6 +1217,119 @@ replace_rte_variables_mutator(Node *node, } +/* + * map_variable_attnos() finds all user-column Vars in an expression tree + * that reference a particular RTE, and adjusts their varattnos according + * to the given mapping array (varattno n is replaced by attno_map[n-1]). + * Vars for system columns are not modified. + * + * A zero in the mapping array represents a dropped column, which should not + * appear in the expression. + * + * If the expression tree contains a whole-row Var for the target RTE, + * the Var is not changed but *found_whole_row is returned as TRUE. + * For most callers this is an error condition, but we leave it to the caller + * to report the error so that useful context can be provided. (In some + * usages it would be appropriate to modify the Var's vartype and insert a + * ConvertRowtypeExpr node to map back to the original vartype. We might + * someday extend this function's API to support that. For now, the only + * concession to that future need is that this function is a tree mutator + * not just a walker.) + * + * This could be built using replace_rte_variables and a callback function, + * but since we don't ever need to insert sublinks, replace_rte_variables is + * overly complicated. + */ + +typedef struct +{ + int target_varno; /* RTE index to search for */ + int sublevels_up; /* (current) nesting depth */ + const AttrNumber *attno_map; /* map array for user attnos */ + int map_length; /* number of entries in attno_map[] */ + bool *found_whole_row; /* output flag */ +} map_variable_attnos_context; + +static Node * +map_variable_attnos_mutator(Node *node, + map_variable_attnos_context *context) +{ + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varno == context->target_varno && + var->varlevelsup == context->sublevels_up) + { + /* Found a matching variable, make the substitution */ + Var *newvar = (Var *) palloc(sizeof(Var)); + int attno = var->varattno; + + *newvar = *var; + if (attno > 0) + { + /* user-defined column, replace attno */ + if (attno > context->map_length || + context->attno_map[attno - 1] == 0) + elog(ERROR, "unexpected varattno %d in expression to be mapped", + attno); + newvar->varattno = newvar->varoattno = context->attno_map[attno - 1]; + } + else if (attno == 0) + { + /* whole-row variable, warn caller */ + *(context->found_whole_row) = true; + } + return (Node *) newvar; + } + /* otherwise fall through to copy the var normally */ + } + else if (IsA(node, Query)) + { + /* Recurse into RTE subquery or not-yet-planned sublink subquery */ + Query *newnode; + + context->sublevels_up++; + newnode = query_tree_mutator((Query *) node, + map_variable_attnos_mutator, + (void *) context, + 0); + context->sublevels_up--; + return (Node *) newnode; + } + return expression_tree_mutator(node, map_variable_attnos_mutator, + (void *) context); +} + +Node * +map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row) +{ + map_variable_attnos_context context; + + context.target_varno = target_varno; + context.sublevels_up = sublevels_up; + context.attno_map = attno_map; + context.map_length = map_length; + context.found_whole_row = found_whole_row; + + *found_whole_row = false; + + /* + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, we don't want to increment sublevels_up. + */ + return query_or_expression_tree_mutator(node, + map_variable_attnos_mutator, + (void *) &context, + 0); +} + + /* * ResolveNew - replace Vars with corresponding items from a targetlist * diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 9ceb086f68..15d4713cec 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -61,10 +61,6 @@ extern void find_composite_type_dependencies(Oid typeOid, extern void check_of_type(HeapTuple typetuple); -extern AttrNumber *varattnos_map(TupleDesc olddesc, TupleDesc newdesc); -extern AttrNumber *varattnos_map_schema(TupleDesc old, List *schema); -extern void change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h index 7226187849..6f57b37b81 100644 --- a/src/include/rewrite/rewriteManip.h +++ b/src/include/rewrite/rewriteManip.h @@ -65,6 +65,11 @@ extern Node *replace_rte_variables(Node *node, extern Node *replace_rte_variables_mutator(Node *node, replace_rte_variables_context *context); +extern Node *map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row); + extern Node *ResolveNew(Node *node, int target_varno, int sublevels_up, RangeTblEntry *target_rte, List *targetlist, int event, int update_varno,