Improve handling of domains over arrays.

This patch eliminates various bizarre behaviors caused by sloppy thinking
about the difference between a domain type and its underlying array type.
In particular, the operation of updating one element of such an array
has to be considered as yielding a value of the underlying array type,
*not* a value of the domain, because there's no assurance that the
domain's CHECK constraints are still satisfied.  If we're intending to
store the result back into a domain column, we have to re-cast to the
domain type so that constraints are re-checked.

For similar reasons, such a domain can't be blindly matched to an ANYARRAY
polymorphic parameter, because the polymorphic function is likely to apply
array-ish operations that could invalidate the domain constraints.  For the
moment, we just forbid such matching.  We might later wish to insert an
automatic downcast to the underlying array type, but such a change should
also change matching of domains to ANYELEMENT for consistency.

To ensure that all such logic is rechecked, this patch removes the original
hack of setting a domain's pg_type.typelem field to match its base type;
the typelem will always be zero instead.  In those places where it's really
okay to look through the domain type with no other logic changes, use the
newly added get_base_element_type function in place of get_element_type.
catversion bumped due to change in pg_type contents.

Per bug #5717 from Richard Huxton and subsequent discussion.
This commit is contained in:
Tom Lane 2010-10-21 16:07:17 -04:00
parent 572ab1a542
commit 529cb267a6
21 changed files with 439 additions and 101 deletions

View File

@ -5698,9 +5698,8 @@
<entry></entry>
<entry><para>
<structfield>typndims</structfield> is the number of array dimensions
for a domain that is an array (that is, <structfield>typbasetype</> is
an array type; the domain's <structfield>typelem</> will match the base
type's <structfield>typelem</structfield>).
for a domain over an array (that is, <structfield>typbasetype</> is
an array type).
Zero for types other than domains over array types.
</para></entry>
</row>

View File

@ -1657,6 +1657,23 @@ CreateCast(CreateCastStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("array data types are not binary-compatible")));
/*
* We also disallow creating binary-compatibility casts involving
* domains. Casting from a domain to its base type is already
* allowed, and casting the other way ought to go through domain
* coercion to permit constraint checking. Again, if you're intent on
* having your own semantics for that, create a no-op cast function.
*
* NOTE: if we were to relax this, the above checks for composites
* etc. would have to be modified to look through domains to their
* base types.
*/
if (sourcetyptype == TYPTYPE_DOMAIN ||
targettyptype == TYPTYPE_DOMAIN)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("domain data types must not be marked binary-compatible")));
}
/*

View File

@ -525,14 +525,12 @@ DefineType(List *names, List *parameters)
/*
* now have TypeCreate do all the real work.
*
* Note: the pg_type.oid is stored in user tables as array elements (base
* types) in ArrayType and in composite types in DatumTupleFields. This
* oid must be preserved by binary upgrades.
*/
typoid =
/*
* The pg_type.oid is stored in user tables as array elements (base types)
* in ArrayType and in composite types in DatumTupleFields. This oid must
* be preserved by binary upgrades.
*/
TypeCreate(InvalidOid, /* no predetermined type OID */
typeName, /* type name */
typeNamespace, /* namespace */
@ -746,7 +744,6 @@ DefineDomain(CreateDomainStmt *stmt)
Oid sendProcedure;
Oid analyzeProcedure;
bool byValue;
Oid typelem;
char category;
char delimiter;
char alignment;
@ -831,9 +828,6 @@ DefineDomain(CreateDomainStmt *stmt)
/* Type Category */
category = baseType->typcategory;
/* Array element type (in case base type is an array) */
typelem = baseType->typelem;
/* Array element Delimiter */
delimiter = baseType->typdelim;
@ -1033,7 +1027,7 @@ DefineDomain(CreateDomainStmt *stmt)
InvalidOid, /* typmodin procedure - none */
InvalidOid, /* typmodout procedure - none */
analyzeProcedure, /* analyze procedure */
typelem, /* element type ID */
InvalidOid, /* no array element type */
false, /* this isn't an array */
InvalidOid, /* no arrays for domains (yet) */
basetypeoid, /* base type ID */
@ -1670,7 +1664,7 @@ AlterDomainDefault(List *names, Node *defaultRaw)
typTup->typmodin,
typTup->typmodout,
typTup->typanalyze,
typTup->typelem,
InvalidOid,
false, /* a domain isn't an implicit array */
typTup->typbasetype,
defaultExpr,

View File

@ -1908,6 +1908,9 @@ find_coercion_pathway(Oid targetTypeId, Oid sourceTypeId,
* array types. If so, and if the element types have a suitable cast,
* report that we can coerce with an ArrayCoerceExpr.
*
* Note that the source type can be a domain over array, but not the
* target, because ArrayCoerceExpr won't check domain constraints.
*
* Hack: disallow coercions to oidvector and int2vector, which
* otherwise tend to capture coercions that should go to "real" array
* types. We want those types to be considered "real" arrays for many
@ -1921,7 +1924,7 @@ find_coercion_pathway(Oid targetTypeId, Oid sourceTypeId,
Oid sourceElem;
if ((targetElem = get_element_type(targetTypeId)) != InvalidOid &&
(sourceElem = get_element_type(sourceTypeId)) != InvalidOid)
(sourceElem = get_base_element_type(sourceTypeId)) != InvalidOid)
{
CoercionPathType elempathtype;
Oid elemfuncid;
@ -2001,10 +2004,8 @@ find_typmod_coercion_function(Oid typeId,
targetType = typeidType(typeId);
typeForm = (Form_pg_type) GETSTRUCT(targetType);
/* Check for a varlena array type (and not a domain) */
if (typeForm->typelem != InvalidOid &&
typeForm->typlen == -1 &&
typeForm->typtype != TYPTYPE_DOMAIN)
/* Check for a varlena array type */
if (typeForm->typelem != InvalidOid && typeForm->typlen == -1)
{
/* Yes, switch our attention to the element type */
typeId = typeForm->typelem;

View File

@ -161,19 +161,17 @@ transformExpr(ParseState *pstate, Node *expr)
targetType = typenameTypeId(pstate, tc->typeName,
&targetTypmod);
/*
* If target is a domain over array, work with the base
* array type here. transformTypeCast below will cast the
* array type to the domain. In the usual case that the
* target is not a domain, transformTypeCast is a no-op.
*/
targetType = getBaseTypeAndTypmod(targetType,
&targetTypmod);
elementType = get_element_type(targetType);
if (OidIsValid(elementType))
{
/*
* tranformArrayExpr doesn't know how to check domain
* constraints, so ask it to return the base type
* instead. transformTypeCast below will cast it to
* the domain. In the usual case that the target is
* not a domain, transformTypeCast is a no-op.
*/
targetType = getBaseTypeAndTypmod(targetType,
&targetTypmod);
tc = copyObject(tc);
tc->arg = transformArrayExpr(pstate,
(A_ArrayExpr *) tc->arg,

View File

@ -25,6 +25,7 @@
#include "parser/parse_relation.h"
#include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/varbit.h"
@ -198,19 +199,35 @@ make_var(ParseState *pstate, RangeTblEntry *rte, int attrno, int location)
/*
* transformArrayType()
* Get the element type of an array type in preparation for subscripting
* Identify the types involved in a subscripting operation
*
* On entry, arrayType/arrayTypmod identify the type of the input value
* to be subscripted (which could be a domain type). These are modified
* if necessary to identify the actual array type and typmod, and the
* array's element type is returned. An error is thrown if the input isn't
* an array type.
*/
Oid
transformArrayType(Oid arrayType)
transformArrayType(Oid *arrayType, int32 *arrayTypmod)
{
Oid origArrayType = *arrayType;
Oid elementType;
HeapTuple type_tuple_array;
Form_pg_type type_struct_array;
/*
* If the input is a domain, smash to base type, and extract the actual
* typmod to be applied to the base type. Subscripting a domain is an
* operation that necessarily works on the base array type, not the domain
* itself. (Note that we provide no method whereby the creator of a
* domain over an array type could hide its ability to be subscripted.)
*/
*arrayType = getBaseTypeAndTypmod(*arrayType, arrayTypmod);
/* Get the type tuple for the array */
type_tuple_array = SearchSysCache1(TYPEOID, ObjectIdGetDatum(arrayType));
type_tuple_array = SearchSysCache1(TYPEOID, ObjectIdGetDatum(*arrayType));
if (!HeapTupleIsValid(type_tuple_array))
elog(ERROR, "cache lookup failed for type %u", arrayType);
elog(ERROR, "cache lookup failed for type %u", *arrayType);
type_struct_array = (Form_pg_type) GETSTRUCT(type_tuple_array);
/* needn't check typisdefined since this will fail anyway */
@ -220,7 +237,7 @@ transformArrayType(Oid arrayType)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("cannot subscript type %s because it is not an array",
format_type_be(arrayType))));
format_type_be(origArrayType))));
ReleaseSysCache(type_tuple_array);
@ -241,13 +258,17 @@ transformArrayType(Oid arrayType)
* that array. We produce an expression that represents the new array value
* with the source data inserted into the right part of the array.
*
* For both cases, if the source array is of a domain-over-array type,
* the result is of the base array type or its element type; essentially,
* we must fold a domain to its base type before applying subscripting.
*
* pstate Parse state
* arrayBase Already-transformed expression for the array as a whole
* arrayType OID of array's datatype (should match type of arrayBase)
* arrayType OID of array's datatype (should match type of arrayBase,
* or be the base type of arrayBase's domain type)
* elementType OID of array's element type (fetch with transformArrayType,
* or pass InvalidOid to do it here)
* elementTypMod typmod to be applied to array elements (if storing) or of
* the source array (if fetching)
* arrayTypMod typmod for the array (which is also typmod for the elements)
* indirection Untransformed list of subscripts (must not be NIL)
* assignFrom NULL for array fetch, else transformed expression for source.
*/
@ -256,7 +277,7 @@ transformArraySubscripts(ParseState *pstate,
Node *arrayBase,
Oid arrayType,
Oid elementType,
int32 elementTypMod,
int32 arrayTypMod,
List *indirection,
Node *assignFrom)
{
@ -266,9 +287,13 @@ transformArraySubscripts(ParseState *pstate,
ListCell *idx;
ArrayRef *aref;
/* Caller may or may not have bothered to determine elementType */
/*
* Caller may or may not have bothered to determine elementType. Note
* that if the caller did do so, arrayType/arrayTypMod must be as
* modified by transformArrayType, ie, smash domain to base type.
*/
if (!OidIsValid(elementType))
elementType = transformArrayType(arrayType);
elementType = transformArrayType(&arrayType, &arrayTypMod);
/*
* A list containing only single subscripts refers to a single array
@ -356,7 +381,7 @@ transformArraySubscripts(ParseState *pstate,
newFrom = coerce_to_target_type(pstate,
assignFrom, typesource,
typeneeded, elementTypMod,
typeneeded, arrayTypMod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
-1);
@ -378,7 +403,7 @@ transformArraySubscripts(ParseState *pstate,
aref = makeNode(ArrayRef);
aref->refarraytype = arrayType;
aref->refelemtype = elementType;
aref->reftypmod = elementTypMod;
aref->reftypmod = arrayTypMod;
aref->refupperindexpr = upperIndexpr;
aref->reflowerindexpr = lowerIndexpr;
aref->refexpr = (Expr *) arrayBase;

View File

@ -209,7 +209,7 @@ get_sort_group_operators(Oid argtype,
eq_opr == ARRAY_EQ_OP ||
gt_opr == ARRAY_GT_OP)
{
Oid elem_type = get_element_type(argtype);
Oid elem_type = get_base_element_type(argtype);
if (OidIsValid(elem_type))
{
@ -906,7 +906,7 @@ make_scalar_array_op(ParseState *pstate, List *opname,
rtypeId = UNKNOWNOID;
else
{
rtypeId = get_element_type(atypeId);
rtypeId = get_base_element_type(atypeId);
if (!OidIsValid(rtypeId))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),

View File

@ -43,6 +43,16 @@ static Node *transformAssignmentIndirection(ParseState *pstate,
ListCell *indirection,
Node *rhs,
int location);
static Node *transformAssignmentSubscripts(ParseState *pstate,
Node *basenode,
const char *targetName,
Oid targetTypeId,
int32 targetTypMod,
List *subscripts,
bool isSlice,
ListCell *next_indirection,
Node *rhs,
int location);
static List *ExpandColumnRefStar(ParseState *pstate, ColumnRef *cref,
bool targetlist);
static List *ExpandAllTables(ParseState *pstate, int location);
@ -613,27 +623,17 @@ transformAssignmentIndirection(ParseState *pstate,
/* process subscripts before this field selection */
if (subscripts)
{
Oid elementTypeId = transformArrayType(targetTypeId);
Oid typeNeeded = isSlice ? targetTypeId : elementTypeId;
/* recurse to create appropriate RHS for array assign */
rhs = transformAssignmentIndirection(pstate,
NULL,
/* recurse, and then return because we're done */
return transformAssignmentSubscripts(pstate,
basenode,
targetName,
true,
typeNeeded,
targetTypeId,
targetTypMod,
subscripts,
isSlice,
i,
rhs,
location);
/* process subscripts */
return (Node *) transformArraySubscripts(pstate,
basenode,
targetTypeId,
elementTypeId,
targetTypMod,
subscripts,
rhs);
}
/* No subscripts, so can process field selection here */
@ -690,27 +690,17 @@ transformAssignmentIndirection(ParseState *pstate,
/* process trailing subscripts, if any */
if (subscripts)
{
Oid elementTypeId = transformArrayType(targetTypeId);
Oid typeNeeded = isSlice ? targetTypeId : elementTypeId;
/* recurse to create appropriate RHS for array assign */
rhs = transformAssignmentIndirection(pstate,
NULL,
/* recurse, and then return because we're done */
return transformAssignmentSubscripts(pstate,
basenode,
targetName,
true,
typeNeeded,
targetTypeId,
targetTypMod,
subscripts,
isSlice,
NULL,
rhs,
location);
/* process subscripts */
return (Node *) transformArraySubscripts(pstate,
basenode,
targetTypeId,
elementTypeId,
targetTypMod,
subscripts,
rhs);
}
/* base case: just coerce RHS to match target type ID */
@ -748,6 +738,79 @@ transformAssignmentIndirection(ParseState *pstate,
return result;
}
/*
* helper for transformAssignmentIndirection: process array assignment
*/
static Node *
transformAssignmentSubscripts(ParseState *pstate,
Node *basenode,
const char *targetName,
Oid targetTypeId,
int32 targetTypMod,
List *subscripts,
bool isSlice,
ListCell *next_indirection,
Node *rhs,
int location)
{
Node *result;
Oid arrayType;
int32 arrayTypMod;
Oid elementTypeId;
Oid typeNeeded;
Assert(subscripts != NIL);
/* Identify the actual array type and element type involved */
arrayType = targetTypeId;
arrayTypMod = targetTypMod;
elementTypeId = transformArrayType(&arrayType, &arrayTypMod);
/* Identify type that RHS must provide */
typeNeeded = isSlice ? arrayType : elementTypeId;
/* recurse to create appropriate RHS for array assign */
rhs = transformAssignmentIndirection(pstate,
NULL,
targetName,
true,
typeNeeded,
arrayTypMod,
next_indirection,
rhs,
location);
/* process subscripts */
result = (Node *) transformArraySubscripts(pstate,
basenode,
arrayType,
elementTypeId,
arrayTypMod,
subscripts,
rhs);
/* If target was a domain over array, need to coerce up to the domain */
if (arrayType != targetTypeId)
{
result = coerce_to_target_type(pstate,
result, exprType(result),
targetTypeId, targetTypMod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
-1);
/* probably shouldn't fail, but check */
if (result == NULL)
ereport(ERROR,
(errcode(ERRCODE_CANNOT_COERCE),
errmsg("cannot cast type %s to %s",
format_type_be(exprType(result)),
format_type_be(targetTypeId)),
parser_errposition(pstate, location)));
}
return result;
}
/*
* checkInsertTargets -

View File

@ -134,18 +134,16 @@ format_type_internal(Oid type_oid, int32 typemod,
typeform = (Form_pg_type) GETSTRUCT(tuple);
/*
* Check if it's an array (and not a domain --- we don't want to show the
* substructure of a domain type). Fixed-length array types such as
* "name" shouldn't get deconstructed either. As of Postgres 8.1, rather
* than checking typlen we check the toast property, and don't deconstruct
* "plain storage" array types --- this is because we don't want to show
* oidvector as oid[].
* Check if it's a regular (variable length) array type. Fixed-length
* array types such as "name" shouldn't get deconstructed. As of Postgres
* 8.1, rather than checking typlen we check the toast property, and don't
* deconstruct "plain storage" array types --- this is because we don't
* want to show oidvector as oid[].
*/
array_base_type = typeform->typelem;
if (array_base_type != InvalidOid &&
typeform->typstorage != 'p' &&
typeform->typtype != TYPTYPE_DOMAIN)
typeform->typstorage != 'p')
{
/* Switch our attention to the array element type */
ReleaseSysCache(tuple);

View File

@ -4850,7 +4850,7 @@ get_rule_expr(Node *node, deparse_context *context,
appendStringInfo(buf, " %s %s (",
generate_operator_name(expr->opno,
exprType(arg1),
get_element_type(exprType(arg2))),
get_base_element_type(exprType(arg2))),
expr->useOr ? "ANY" : "ALL");
get_rule_expr_paren(arg2, context, true, node);
appendStringInfoChar(buf, ')');

View File

@ -1704,7 +1704,7 @@ scalararraysel(PlannerInfo *root,
rightop = (Node *) lsecond(clause->args);
/* get nominal (after relabeling) element type of rightop */
nominal_element_type = get_element_type(exprType(rightop));
nominal_element_type = get_base_element_type(exprType(rightop));
if (!OidIsValid(nominal_element_type))
return (Selectivity) 0.5; /* probably shouldn't happen */

View File

@ -1615,7 +1615,7 @@ map_xml_name_to_sql_identifier(char *name)
char *
map_sql_value_to_xml_value(Datum value, Oid type, bool xml_escape_strings)
{
if (type_is_array(type))
if (type_is_array_domain(type))
{
ArrayType *array;
Oid elmtype;

View File

@ -2212,6 +2212,52 @@ get_array_type(Oid typid)
return result;
}
/*
* get_base_element_type
* Given the type OID, get the typelem, looking "through" any domain
* to its underlying array type.
*
* This is equivalent to get_element_type(getBaseType(typid)), but avoids
* an extra cache lookup. Note that it fails to provide any information
* about the typmod of the array.
*/
Oid
get_base_element_type(Oid typid)
{
/*
* We loop to find the bottom base type in a stack of domains.
*/
for (;;)
{
HeapTuple tup;
Form_pg_type typTup;
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
if (!HeapTupleIsValid(tup))
break;
typTup = (Form_pg_type) GETSTRUCT(tup);
if (typTup->typtype != TYPTYPE_DOMAIN)
{
/* Not a domain, so stop descending */
Oid result;
/* This test must match get_element_type */
if (typTup->typlen == -1)
result = typTup->typelem;
else
result = InvalidOid;
ReleaseSysCache(tup);
return result;
}
typid = typTup->typbasetype;
ReleaseSysCache(tup);
}
/* Like get_element_type, silently return InvalidOid for bogus input */
return InvalidOid;
}
/*
* getTypeInputInfo
*

View File

@ -2326,10 +2326,10 @@ get_call_expr_argtype(Node *expr, int argnum)
*/
if (IsA(expr, ScalarArrayOpExpr) &&
argnum == 1)
argtype = get_element_type(argtype);
argtype = get_base_element_type(argtype);
else if (IsA(expr, ArrayCoerceExpr) &&
argnum == 0)
argtype = get_element_type(argtype);
argtype = get_base_element_type(argtype);
return argtype;
}

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201010151
#define CATALOG_VERSION_NO 201010201
#endif

View File

@ -189,8 +189,7 @@ CATALOG(pg_type,1247) BKI_BOOTSTRAP BKI_ROWTYPE_OID(71) BKI_SCHEMA_MACRO
/*
* typndims is the declared number of dimensions for an array domain type
* (i.e., typbasetype is an array type; the domain's typelem will match
* the base type's typelem). Otherwise zero.
* (i.e., typbasetype is an array type). Otherwise zero.
*/
int4 typndims;

View File

@ -139,12 +139,12 @@ extern void cancel_parser_errposition_callback(ParseCallbackState *pcbstate);
extern Var *make_var(ParseState *pstate, RangeTblEntry *rte, int attrno,
int location);
extern Oid transformArrayType(Oid arrayType);
extern Oid transformArrayType(Oid *arrayType, int32 *arrayTypmod);
extern ArrayRef *transformArraySubscripts(ParseState *pstate,
Node *arrayBase,
Oid arrayType,
Oid elementType,
int32 elementTypMod,
int32 arrayTypMod,
List *indirection,
Node *assignFrom);
extern Const *make_const(ParseState *pstate, Value *value, int location);

View File

@ -117,6 +117,7 @@ extern void get_type_category_preferred(Oid typid,
extern Oid get_typ_typrelid(Oid typid);
extern Oid get_element_type(Oid typid);
extern Oid get_array_type(Oid typid);
extern Oid get_base_element_type(Oid typid);
extern void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam);
extern void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena);
extern void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam);
@ -138,6 +139,8 @@ extern void free_attstatsslot(Oid atttype,
extern char *get_namespace_name(Oid nspid);
#define type_is_array(typid) (get_element_type(typid) != InvalidOid)
/* type_is_array_domain accepts both plain arrays and domains over arrays */
#define type_is_array_domain(typid) (get_base_element_type(typid) != InvalidOid)
#define TypeIsToastable(typid) (get_typstorage(typid) != 'p')

View File

@ -154,6 +154,7 @@ static void exec_assign_value(PLpgSQL_execstate *estate,
static void exec_eval_datum(PLpgSQL_execstate *estate,
PLpgSQL_datum *datum,
Oid *typeid,
int32 *typetypmod,
Datum *value,
bool *isnull);
static int exec_eval_integer(PLpgSQL_execstate *estate,
@ -3736,6 +3737,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
bool oldarrayisnull;
Oid arraytypeid,
arrayelemtypeid;
int32 arraytypmod;
int16 arraytyplen,
elemtyplen;
bool elemtypbyval;
@ -3780,8 +3782,13 @@ exec_assign_value(PLpgSQL_execstate *estate,
/* Fetch current value of array datum */
exec_eval_datum(estate, target,
&arraytypeid, &oldarraydatum, &oldarrayisnull);
&arraytypeid, &arraytypmod,
&oldarraydatum, &oldarrayisnull);
/* If target is domain over array, reduce to base type */
arraytypeid = getBaseTypeAndTypmod(arraytypeid, &arraytypmod);
/* ... and identify the element type */
arrayelemtypeid = get_element_type(arraytypeid);
if (!OidIsValid(arrayelemtypeid))
ereport(ERROR,
@ -3831,7 +3838,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
coerced_value = exec_simple_cast_value(value,
valtype,
arrayelemtypeid,
-1,
arraytypmod,
*isNull);
/*
@ -3875,7 +3882,9 @@ exec_assign_value(PLpgSQL_execstate *estate,
/*
* Assign the new array to the base variable. It's never NULL
* at this point.
* at this point. Note that if the target is a domain,
* coercing the base array type back up to the domain will
* happen within exec_assign_value.
*/
*isNull = false;
exec_assign_value(estate, target,
@ -3897,7 +3906,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
/*
* exec_eval_datum Get current value of a PLpgSQL_datum
*
* The type oid, value in Datum format, and null flag are returned.
* The type oid, typmod, value in Datum format, and null flag are returned.
*
* At present this doesn't handle PLpgSQL_expr or PLpgSQL_arrayelem datums.
*
@ -3910,6 +3919,7 @@ static void
exec_eval_datum(PLpgSQL_execstate *estate,
PLpgSQL_datum *datum,
Oid *typeid,
int32 *typetypmod,
Datum *value,
bool *isnull)
{
@ -3922,6 +3932,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
PLpgSQL_var *var = (PLpgSQL_var *) datum;
*typeid = var->datatype->typoid;
*typetypmod = var->datatype->atttypmod;
*value = var->value;
*isnull = var->isnull;
break;
@ -3942,6 +3953,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
elog(ERROR, "row not compatible with its own tupdesc");
MemoryContextSwitchTo(oldcontext);
*typeid = row->rowtupdesc->tdtypeid;
*typetypmod = row->rowtupdesc->tdtypmod;
*value = HeapTupleGetDatum(tup);
*isnull = false;
break;
@ -3974,6 +3986,7 @@ exec_eval_datum(PLpgSQL_execstate *estate,
HeapTupleHeaderSetTypMod(worktup.t_data, rec->tupdesc->tdtypmod);
MemoryContextSwitchTo(oldcontext);
*typeid = rec->tupdesc->tdtypeid;
*typetypmod = rec->tupdesc->tdtypmod;
*value = HeapTupleGetDatum(&worktup);
*isnull = false;
break;
@ -3999,6 +4012,11 @@ exec_eval_datum(PLpgSQL_execstate *estate,
errmsg("record \"%s\" has no field \"%s\"",
rec->refname, recfield->fieldname)));
*typeid = SPI_gettypeid(rec->tupdesc, fno);
/* XXX there's no SPI_gettypmod, for some reason */
if (fno > 0)
*typetypmod = rec->tupdesc->attrs[fno - 1]->atttypmod;
else
*typetypmod = -1;
*value = SPI_getbinval(rec->tup, rec->tupdesc, fno, isnull);
break;
}
@ -4671,6 +4689,7 @@ plpgsql_param_fetch(ParamListInfo params, int paramid)
PLpgSQL_expr *expr;
PLpgSQL_datum *datum;
ParamExternData *prm;
int32 prmtypmod;
/* paramid's are 1-based, but dnos are 0-based */
dno = paramid - 1;
@ -4693,7 +4712,8 @@ plpgsql_param_fetch(ParamListInfo params, int paramid)
datum = estate->datums[dno];
prm = &params->params[dno];
exec_eval_datum(estate, datum,
&prm->ptype, &prm->value, &prm->isnull);
&prm->ptype, &prmtypmod,
&prm->value, &prm->isnull);
}
@ -4870,6 +4890,7 @@ make_tuple_from_row(PLpgSQL_execstate *estate,
for (i = 0; i < natts; i++)
{
Oid fieldtypeid;
int32 fieldtypmod;
if (tupdesc->attrs[i]->attisdropped)
{
@ -4880,9 +4901,11 @@ make_tuple_from_row(PLpgSQL_execstate *estate,
elog(ERROR, "dropped rowtype entry for non-dropped column");
exec_eval_datum(estate, estate->datums[row->varnos[i]],
&fieldtypeid, &dvalues[i], &nulls[i]);
&fieldtypeid, &fieldtypmod,
&dvalues[i], &nulls[i]);
if (fieldtypeid != tupdesc->attrs[i]->atttypid)
return NULL;
/* XXX should we insist on typmod match, too? */
}
tuple = heap_form_tuple(tupdesc, dvalues, nulls);

View File

@ -496,3 +496,102 @@ drop table ddtest2;
drop type ddtest1;
drop domain posint cascade;
NOTICE: drop cascades to type posint2
--
-- Check enforcement of domain-related typmod in plpgsql (bug #5717)
--
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x numeric(4,2)[1];
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
ERROR: numeric field overflow
DETAIL: A field with precision 4, scale 2 must round to an absolute value less than 10^2.
CONTEXT: PL/pgSQL function "array_elem_check" line 5 at assignment
select array_elem_check(1.23456);
array_elem_check
------------------
1.23
(1 row)
create domain mynums as numeric(4,2)[1];
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x mynums;
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
ERROR: numeric field overflow
DETAIL: A field with precision 4, scale 2 must round to an absolute value less than 10^2.
CONTEXT: PL/pgSQL function "array_elem_check" line 5 at assignment
select array_elem_check(1.23456);
array_elem_check
------------------
1.23
(1 row)
create domain mynums2 as mynums;
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x mynums2;
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
ERROR: numeric field overflow
DETAIL: A field with precision 4, scale 2 must round to an absolute value less than 10^2.
CONTEXT: PL/pgSQL function "array_elem_check" line 5 at assignment
select array_elem_check(1.23456);
array_elem_check
------------------
1.23
(1 row)
drop function array_elem_check(numeric);
--
-- Check enforcement of array-level domain constraints
--
create domain orderedpair as int[2] check (value[1] < value[2]);
select array[1,2]::orderedpair;
array
-------
{1,2}
(1 row)
select array[2,1]::orderedpair; -- fail
ERROR: value for domain orderedpair violates check constraint "orderedpair_check"
create temp table op (f1 orderedpair);
insert into op values (array[1,2]);
insert into op values (array[2,1]); -- fail
ERROR: value for domain orderedpair violates check constraint "orderedpair_check"
update op set f1[2] = 3;
update op set f1[2] = 0; -- fail
ERROR: value for domain orderedpair violates check constraint "orderedpair_check"
select * from op;
f1
-------
{1,3}
(1 row)
create or replace function array_elem_check(int) returns int as $$
declare
x orderedpair := '{1,2}';
begin
x[2] := $1;
return x[2];
end$$ language plpgsql;
select array_elem_check(3);
array_elem_check
------------------
3
(1 row)
select array_elem_check(-1);
ERROR: value for domain orderedpair violates check constraint "orderedpair_check"
CONTEXT: PL/pgSQL function "array_elem_check" line 5 at assignment
drop function array_elem_check(int);

View File

@ -393,3 +393,76 @@ alter domain posint add constraint c2 check(value > 0); -- OK
drop table ddtest2;
drop type ddtest1;
drop domain posint cascade;
--
-- Check enforcement of domain-related typmod in plpgsql (bug #5717)
--
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x numeric(4,2)[1];
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
select array_elem_check(1.23456);
create domain mynums as numeric(4,2)[1];
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x mynums;
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
select array_elem_check(1.23456);
create domain mynums2 as mynums;
create or replace function array_elem_check(numeric) returns numeric as $$
declare
x mynums2;
begin
x[1] := $1;
return x[1];
end$$ language plpgsql;
select array_elem_check(121.00);
select array_elem_check(1.23456);
drop function array_elem_check(numeric);
--
-- Check enforcement of array-level domain constraints
--
create domain orderedpair as int[2] check (value[1] < value[2]);
select array[1,2]::orderedpair;
select array[2,1]::orderedpair; -- fail
create temp table op (f1 orderedpair);
insert into op values (array[1,2]);
insert into op values (array[2,1]); -- fail
update op set f1[2] = 3;
update op set f1[2] = 0; -- fail
select * from op;
create or replace function array_elem_check(int) returns int as $$
declare
x orderedpair := '{1,2}';
begin
x[2] := $1;
return x[2];
end$$ language plpgsql;
select array_elem_check(3);
select array_elem_check(-1);
drop function array_elem_check(int);