Add 'missing_ok' argument to build_attrmap_by_name

When it's given as true, return a 0 in the position of the missing
column rather than raising an error.

This is currently unused, but it allows us to reimplement column
permission checking in a subsequent commit.  It seems worth breaking
into a separate commit because it affects unrelated code.

Author: Amit Langote <amitlangote09@gmail.com>
Discussion: https://postgr.es/m/CA+HiwqFfiai=qBxPDTjaio_ZcaqUKh+FC=prESrB8ogZgFNNNQ@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2022-11-29 09:39:36 +01:00
parent 00ae5d6f58
commit ad86d159b6
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
10 changed files with 55 additions and 28 deletions

View File

@ -169,10 +169,15 @@ build_attrmap_by_position(TupleDesc indesc,
* and output columns by name. (Dropped columns are ignored in both input and
* output.) This is normally a subroutine for convert_tuples_by_name in
* tupconvert.c, but can be used standalone.
*
* If 'missing_ok' is true, a column from 'outdesc' not being present in
* 'indesc' is not flagged as an error; AttrMap.attnums[] entry for such an
* outdesc column will be 0 in that case.
*/
AttrMap *
build_attrmap_by_name(TupleDesc indesc,
TupleDesc outdesc)
TupleDesc outdesc,
bool missing_ok)
{
AttrMap *attrMap;
int outnatts;
@ -235,7 +240,7 @@ build_attrmap_by_name(TupleDesc indesc,
break;
}
}
if (attrMap->attnums[i] == 0)
if (attrMap->attnums[i] == 0 && !missing_ok)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("could not convert row type"),
@ -257,12 +262,13 @@ build_attrmap_by_name(TupleDesc indesc,
*/
AttrMap *
build_attrmap_by_name_if_req(TupleDesc indesc,
TupleDesc outdesc)
TupleDesc outdesc,
bool missing_ok)
{
AttrMap *attrMap;
/* Verify compatibility and prepare attribute-number map */
attrMap = build_attrmap_by_name(indesc, outdesc);
attrMap = build_attrmap_by_name(indesc, outdesc, missing_ok);
/* Check if the map has a one-to-one match */
if (check_attrmap_match(indesc, outdesc, attrMap))

View File

@ -107,7 +107,7 @@ convert_tuples_by_name(TupleDesc indesc,
int n = outdesc->natts;
/* Verify compatibility and prepare attribute-number map */
attrMap = build_attrmap_by_name_if_req(indesc, outdesc);
attrMap = build_attrmap_by_name_if_req(indesc, outdesc, false);
if (attrMap == NULL)
{

View File

@ -227,7 +227,8 @@ map_partition_varattnos(List *expr, int fromrel_varno,
bool found_whole_row;
part_attmap = build_attrmap_by_name(RelationGetDescr(to_rel),
RelationGetDescr(from_rel));
RelationGetDescr(from_rel),
false);
expr = (List *) map_variable_attnos((Node *) expr,
fromrel_varno, 0,
part_attmap,

View File

@ -1290,7 +1290,8 @@ DefineIndex(Oid relationId,
childidxs = RelationGetIndexList(childrel);
attmap =
build_attrmap_by_name(RelationGetDescr(childrel),
parentDesc);
parentDesc,
false);
foreach(cell, childidxs)
{

View File

@ -1206,7 +1206,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
}
attmap = build_attrmap_by_name(RelationGetDescr(rel),
RelationGetDescr(parent));
RelationGetDescr(parent),
false);
idxstmt =
generateClonedIndexStmt(NULL, idxRel,
attmap, &constraintOid);
@ -9647,7 +9648,8 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
* definition to match the partition's column layout.
*/
map = build_attrmap_by_name_if_req(RelationGetDescr(partRel),
RelationGetDescr(pkrel));
RelationGetDescr(pkrel),
false);
if (map)
{
mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
@ -9814,7 +9816,8 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
CheckTableNotInUse(partition, "ALTER TABLE");
attmap = build_attrmap_by_name(RelationGetDescr(partition),
RelationGetDescr(rel));
RelationGetDescr(rel),
false);
for (int j = 0; j < numfks; j++)
mapped_fkattnum[j] = attmap->attnums[fkattnum[j] - 1];
@ -10022,7 +10025,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
trigrel = table_open(TriggerRelationId, RowExclusiveLock);
attmap = build_attrmap_by_name(RelationGetDescr(partitionRel),
RelationGetDescr(parentRel));
RelationGetDescr(parentRel),
false);
foreach(cell, clone)
{
Oid constrOid = lfirst_oid(cell);
@ -10219,7 +10223,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
* different. This map is used to convert them.
*/
attmap = build_attrmap_by_name(RelationGetDescr(partRel),
RelationGetDescr(parentRel));
RelationGetDescr(parentRel),
false);
partFKs = copyObject(RelationGetFKeyList(partRel));
@ -12335,7 +12340,8 @@ ATPrepAlterColumnType(List **wqueue,
cmd = copyObject(cmd);
attmap = build_attrmap_by_name(RelationGetDescr(childrel),
RelationGetDescr(rel));
RelationGetDescr(rel),
false);
((ColumnDef *) cmd->def)->cooked_default =
map_variable_attnos(def->cooked_default,
1, 0,
@ -18043,7 +18049,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
/* construct an indexinfo to compare existing indexes against */
info = BuildIndexInfo(idxRel);
attmap = build_attrmap_by_name(RelationGetDescr(attachrel),
RelationGetDescr(rel));
RelationGetDescr(rel),
false);
constraintOid = get_relation_idx_constraint_oid(RelationGetRelid(rel), idx);
/*
@ -18981,7 +18988,8 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
childInfo = BuildIndexInfo(partIdx);
parentInfo = BuildIndexInfo(parentIdx);
attmap = build_attrmap_by_name(RelationGetDescr(partTbl),
RelationGetDescr(parentTbl));
RelationGetDescr(parentTbl),
false);
if (!CompareIndexInfo(childInfo, parentInfo,
partIdx->rd_indcollation,
parentIdx->rd_indcollation,

View File

@ -1859,7 +1859,7 @@ ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
old_tupdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc, tupdesc);
map = build_attrmap_by_name_if_req(old_tupdesc, tupdesc, false);
/*
* Partition-specific slot's tupdesc can't be changed, so allocate a
@ -1944,7 +1944,8 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(orig_tupdesc,
tupdesc);
tupdesc,
false);
/*
* Partition-specific slot's tupdesc can't be changed, so
@ -1996,7 +1997,8 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc,
tupdesc);
tupdesc,
false);
/*
* Partition-specific slot's tupdesc can't be changed, so
@ -2103,7 +2105,8 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
tupdesc = RelationGetDescr(rootrel->ri_RelationDesc);
/* a reverse map */
map = build_attrmap_by_name_if_req(old_tupdesc,
tupdesc);
tupdesc,
false);
/*
* Partition-specific slot's tupdesc can't be changed,

View File

@ -582,7 +582,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
*/
part_attmap =
build_attrmap_by_name(RelationGetDescr(partrel),
RelationGetDescr(firstResultRel));
RelationGetDescr(firstResultRel),
false);
wcoList = (List *)
map_variable_attnos((Node *) wcoList,
firstVarno, 0,
@ -639,7 +640,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
if (part_attmap == NULL)
part_attmap =
build_attrmap_by_name(RelationGetDescr(partrel),
RelationGetDescr(firstResultRel));
RelationGetDescr(firstResultRel),
false);
returningList = (List *)
map_variable_attnos((Node *) returningList,
firstVarno, 0,
@ -780,7 +782,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
if (part_attmap == NULL)
part_attmap =
build_attrmap_by_name(RelationGetDescr(partrel),
RelationGetDescr(firstResultRel));
RelationGetDescr(firstResultRel),
false);
onconflset = (List *)
map_variable_attnos((Node *) onconflset,
INNER_VAR, 0,
@ -878,7 +881,8 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
if (part_attmap == NULL)
part_attmap =
build_attrmap_by_name(RelationGetDescr(partrel),
RelationGetDescr(firstResultRel));
RelationGetDescr(firstResultRel),
false);
if (unlikely(!leaf_part_rri->ri_projectNewInfoValid))
ExecInitMergeTupleSlots(mtstate, leaf_part_rri);
@ -1147,7 +1151,8 @@ ExecInitPartitionDispatchInfo(EState *estate,
* routing.
*/
pd->tupmap = build_attrmap_by_name_if_req(RelationGetDescr(parent_pd->reldesc),
tupdesc);
tupdesc,
false);
pd->tupslot = pd->tupmap ?
MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual) : NULL;
}

View File

@ -1232,7 +1232,8 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
* have a failure since both tables are locked.
*/
attmap = build_attrmap_by_name(RelationGetDescr(childrel),
tupleDesc);
tupleDesc,
false);
/*
* Process defaults, if required.

View File

@ -1125,7 +1125,7 @@ init_tuple_slot(PGOutputData *data, Relation relation,
/* Map must live as long as the session does. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
MemoryContextSwitchTo(oldctx);
RelationClose(ancestor);

View File

@ -42,9 +42,11 @@ extern void free_attrmap(AttrMap *map);
/* Conversion routines to build mappings */
extern AttrMap *build_attrmap_by_name(TupleDesc indesc,
TupleDesc outdesc);
TupleDesc outdesc,
bool missing_ok);
extern AttrMap *build_attrmap_by_name_if_req(TupleDesc indesc,
TupleDesc outdesc);
TupleDesc outdesc,
bool missing_ok);
extern AttrMap *build_attrmap_by_position(TupleDesc indesc,
TupleDesc outdesc,
const char *msg);