Clean up two rather nasty bugs in operator selection code.

1. If there is exactly one pg_operator entry of the right name and oprkind,
oper() and related routines would return that entry whether its input type
had anything to do with the request or not.  This is just premature
optimization: we shouldn't return the single candidate until after we verify
that it really is a valid candidate, ie, is at least coercion-compatible
with the given types.

2. oper() and related routines only promise a coercion-compatible result.
Unfortunately, there were quite a few callers that assumed the returned
operator is binary-compatible with the given datatype; they would proceed
to call it without making any datatype coercions.  These callers include
sorting, grouping, aggregation, and VACUUM ANALYZE.  In general I think
it is appropriate for these callers to require an exact or binary-compatible
match, so I've added a new routine compatible_oper() that only succeeds if
it can find an operator that doesn't require any run-time conversions.
Callers now call oper() or compatible_oper() depending on whether they are
prepared to deal with type conversion or not.

The upshot of these bugs is revealed by the following silliness in PL/Tcl's
selftest: it creates an operator @< on int4, and then tries to use it to
sort a char(N) column.  The system would let it do that :-( (and evidently
has done so since 6.3 :-( :-().  The result in this case was just a silly
sort order, but the reverse combination would've provoked coredump from
trying to dereference integers.  With this fix you get more reasonable
behavior:
pltcl_test=# select * from T_pkey1 order by key1, key2 using @<;
ERROR:  Unable to identify an operator '@<' for types 'bpchar' and 'bpchar'
        You will have to retype this query using an explicit cast
This commit is contained in:
Tom Lane 2001-02-16 03:16:58 +00:00
parent b24b2a5be0
commit 13cc7eb3e2
9 changed files with 162 additions and 90 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.13 2001/01/24 19:42:52 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/analyze.c,v 1.14 2001/02/16 03:16:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -152,7 +152,6 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
for (i = 0; i < attr_cnt; i++)
{
Operator func_operator;
Form_pg_operator pgopform;
VacAttrStats *stats;
stats = &vacattrstats[i];
@ -167,21 +166,25 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
func_operator = compatible_oper("=",
stats->attr->atttypid,
stats->attr->atttypid,
true);
if (func_operator != NULL)
{
pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
fmgr_info(oprfuncid(func_operator), &(stats->f_cmpeq));
ReleaseSysCache(func_operator);
}
else
stats->f_cmpeq.fn_addr = NULL;
func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
func_operator = compatible_oper("<",
stats->attr->atttypid,
stats->attr->atttypid,
true);
if (func_operator != NULL)
{
pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
fmgr_info(oprfuncid(func_operator), &(stats->f_cmplt));
stats->op_cmplt = oprid(func_operator);
ReleaseSysCache(func_operator);
}
@ -191,11 +194,13 @@ analyze_rel(Oid relid, List *anal_cols2, int MESSAGE_LEVEL)
stats->op_cmplt = InvalidOid;
}
func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
func_operator = compatible_oper(">",
stats->attr->atttypid,
stats->attr->atttypid,
true);
if (func_operator != NULL)
{
pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
fmgr_info(oprfuncid(func_operator), &(stats->f_cmpgt));
ReleaseSysCache(func_operator);
}
else

View File

@ -46,7 +46,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.74 2001/02/15 21:47:08 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.75 2001/02/16 03:16:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -909,21 +909,19 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
* (Consider COUNT(*).)
*/
Oid inputType = exprType(aggref->target);
Operator eq_operator;
Form_pg_operator pgopform;
Oid eq_function;
peraggstate->inputType = inputType;
get_typlenbyval(inputType,
&peraggstate->inputtypeLen,
&peraggstate->inputtypeByVal);
eq_operator = oper("=", inputType, inputType, true);
if (!HeapTupleIsValid(eq_operator))
eq_function = compatible_oper_funcid("=", inputType, inputType,
true);
if (!OidIsValid(eq_function))
elog(ERROR, "Unable to identify an equality operator for type '%s'",
typeidTypeName(inputType));
pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
fmgr_info(pgopform->oprcode, &(peraggstate->equalfn));
ReleaseSysCache(eq_operator);
fmgr_info(eq_function, &(peraggstate->equalfn));
peraggstate->sortOperator = any_ordering_op(inputType);
peraggstate->sortstate = NULL;
}

View File

@ -15,7 +15,7 @@
* locate group boundaries.
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeGroup.c,v 1.40 2001/01/24 19:42:54 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeGroup.c,v 1.41 2001/02/16 03:16:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -495,16 +495,13 @@ execTuplesMatchPrepare(TupleDesc tupdesc,
{
AttrNumber att = matchColIdx[i];
Oid typid = tupdesc->attrs[att - 1]->atttypid;
Operator eq_operator;
Form_pg_operator pgopform;
Oid eq_function;
eq_operator = oper("=", typid, typid, true);
if (!HeapTupleIsValid(eq_operator))
eq_function = compatible_oper_funcid("=", typid, typid, true);
if (!OidIsValid(eq_function))
elog(ERROR, "Unable to identify an equality operator for type '%s'",
typeidTypeName(typid));
pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
fmgr_info(pgopform->oprcode, &eqfunctions[i]);
ReleaseSysCache(eq_operator);
fmgr_info(eq_function, &eqfunctions[i]);
}
return eqfunctions;

View File

@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/path/indxpath.c,v 1.101 2001/01/24 19:42:57 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/optimizer/path/indxpath.c,v 1.102 2001/02/16 03:16:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -878,7 +878,7 @@ indexable_operator(Expr *clause, Oid opclass, Oid relam,
* (In theory this might find a non-semantically-comparable operator,
* but in practice that seems pretty unlikely for binary-compatible types.)
*/
new_op = oper_oid(opname, indexkeytype, indexkeytype, true);
new_op = compatible_oper_opid(opname, indexkeytype, indexkeytype, true);
if (OidIsValid(new_op))
{

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/initsplan.c,v 1.56 2001/01/24 19:42:58 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/initsplan.c,v 1.57 2001/02/16 03:16:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -617,7 +617,7 @@ process_implied_equality(Query *root, Node *item1, Node *item2,
*/
ltype = exprType(item1);
rtype = exprType(item2);
eq_operator = oper("=", ltype, rtype, true);
eq_operator = compatible_oper("=", ltype, rtype, true);
if (!HeapTupleIsValid(eq_operator))
{
/*

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/parser/parse_clause.c,v 1.76 2001/02/14 21:35:03 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/parser/parse_clause.c,v 1.77 2001/02/16 03:16:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -1155,10 +1155,10 @@ addTargetToSortList(TargetEntry *tle, List *sortlist, List *targetlist,
sortcl->tleSortGroupRef = assignSortGroupRef(tle, targetlist);
if (opname)
sortcl->sortop = oper_oid(opname,
tle->resdom->restype,
tle->resdom->restype,
false);
sortcl->sortop = compatible_oper_opid(opname,
tle->resdom->restype,
tle->resdom->restype,
false);
else
sortcl->sortop = any_ordering_op(tle->resdom->restype);

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/parser/parse_expr.c,v 1.90 2001/02/14 21:35:04 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/parser/parse_expr.c,v 1.91 2001/02/16 03:16:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -383,6 +383,11 @@ transformExpr(ParseState *pstate, Node *expr, int precedence)
lexpr = lfirst(left_list);
left_list = lnext(left_list);
/*
* It's OK to use oper() not compatible_oper() here,
* because make_subplan() will insert type coercion
* calls if needed.
*/
optup = oper(op,
exprType(lexpr),
exprType(tent->expr),

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/parser/parse_oper.c,v 1.46 2001/01/24 19:43:02 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/parser/parse_oper.c,v 1.47 2001/02/16 03:16:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -40,15 +40,15 @@ static void unary_op_error(char *op, Oid arg, bool is_left_op);
/* Select an ordering operator for the given datatype */
Oid
any_ordering_op(Oid restype)
any_ordering_op(Oid argtype)
{
Oid order_opid;
order_opid = oper_oid("<", restype, restype, true);
order_opid = compatible_oper_opid("<", argtype, argtype, true);
if (!OidIsValid(order_opid))
elog(ERROR, "Unable to identify an ordering operator '%s' for type '%s'"
"\n\tUse an explicit ordering operator or modify the query",
"<", typeidTypeName(restype));
"<", typeidTypeName(argtype));
return order_opid;
}
@ -59,6 +59,15 @@ oprid(Operator op)
return op->t_data->t_oid;
}
/* given operator tuple, return the underlying function's OID */
Oid
oprfuncid(Operator op)
{
Form_pg_operator pgopform = (Form_pg_operator) GETSTRUCT(op);
return pgopform->oprcode;
}
/* binary_oper_get_candidates()
* given opname, find all possible input type pairs for which an operator
@ -119,7 +128,7 @@ binary_oper_get_candidates(char *opname,
/* oper_select_candidate()
* Given the input argtype array and more than one candidate
* Given the input argtype array and one or more candidates
* for the function argtype array, attempt to resolve the conflict.
* Returns the selected argtype array if the conflict can be resolved,
* otherwise returns NULL.
@ -593,34 +602,22 @@ oper_inexact(char *op, Oid arg1, Oid arg2)
if (ncandidates == 0)
return NULL;
/* Or found exactly one? Then proceed... */
else if (ncandidates == 1)
/* Otherwise, check for compatible datatypes, and then try to resolve
* the conflict if more than one candidate remains.
*/
inputOids[0] = arg1;
inputOids[1] = arg2;
targetOids = oper_select_candidate(2, inputOids, candidates);
if (targetOids != NULL)
{
tup = SearchSysCache(OPERNAME,
PointerGetDatum(op),
ObjectIdGetDatum(candidates->args[0]),
ObjectIdGetDatum(candidates->args[1]),
ObjectIdGetDatum(targetOids[0]),
ObjectIdGetDatum(targetOids[1]),
CharGetDatum('b'));
Assert(HeapTupleIsValid(tup));
}
/* Otherwise, multiple operators of the desired types found... */
else
{
inputOids[0] = arg1;
inputOids[1] = arg2;
targetOids = oper_select_candidate(2, inputOids, candidates);
if (targetOids != NULL)
{
tup = SearchSysCache(OPERNAME,
PointerGetDatum(op),
ObjectIdGetDatum(targetOids[0]),
ObjectIdGetDatum(targetOids[1]),
CharGetDatum('b'));
}
else
tup = NULL;
}
tup = NULL;
return (Operator) tup;
}
@ -628,6 +625,10 @@ oper_inexact(char *op, Oid arg1, Oid arg2)
/* oper() -- search for a binary operator
* Given operator name, types of arg1 and arg2, return oper struct.
*
* IMPORTANT: the returned operator (if any) is only promised to be
* coercion-compatible with the input datatypes. Do not use this if
* you need an exact- or binary-compatible match; see compatible_oper.
*
* If no matching operator found, return NULL if noError is true,
* raise an error if it is false.
*
@ -653,19 +654,51 @@ oper(char *opname, Oid ltypeId, Oid rtypeId, bool noError)
return (Operator) NULL;
}
/* oper_oid() -- get OID of a binary operator
/* compatible_oper()
* given an opname and input datatypes, find a compatible binary operator
*
* This is tighter than oper() because it will not return an operator that
* requires coercion of the input datatypes (but binary-compatible operators
* are accepted). Otherwise, the semantics are the same.
*/
Operator
compatible_oper(char *op, Oid arg1, Oid arg2, bool noError)
{
Operator optup;
Form_pg_operator opform;
/* oper() will find the best available match */
optup = oper(op, arg1, arg2, noError);
if (optup == (Operator) NULL)
return (Operator) NULL; /* must be noError case */
/* but is it good enough? */
opform = (Form_pg_operator) GETSTRUCT(optup);
if ((opform->oprleft == arg1 ||
IS_BINARY_COMPATIBLE(opform->oprleft, arg1)) &&
(opform->oprright == arg2 ||
IS_BINARY_COMPATIBLE(opform->oprright, arg2)))
return optup;
if (!noError)
op_error(op, arg1, arg2);
return (Operator) NULL;
}
/* compatible_oper_opid() -- get OID of a binary operator
*
* This is a convenience routine that extracts only the operator OID
* from the result of oper(). InvalidOid is returned if the lookup
* fails and noError is true.
* from the result of compatible_oper(). InvalidOid is returned if the
* lookup fails and noError is true.
*/
Oid
oper_oid(char *op, Oid arg1, Oid arg2, bool noError)
compatible_oper_opid(char *op, Oid arg1, Oid arg2, bool noError)
{
Operator optup;
Oid result;
optup = oper(op, arg1, arg2, noError);
optup = compatible_oper(op, arg1, arg2, noError);
if (optup != NULL)
{
result = oprid(optup);
@ -675,6 +708,28 @@ oper_oid(char *op, Oid arg1, Oid arg2, bool noError)
return InvalidOid;
}
/* compatible_oper_funcid() -- get OID of a binary operator's function
*
* This is a convenience routine that extracts only the function OID
* from the result of compatible_oper(). InvalidOid is returned if the
* lookup fails and noError is true.
*/
Oid
compatible_oper_funcid(char *op, Oid arg1, Oid arg2, bool noError)
{
Operator optup;
Oid result;
optup = compatible_oper(op, arg1, arg2, noError);
if (optup != NULL)
{
result = oprfuncid(optup);
ReleaseSysCache(optup);
return result;
}
return InvalidOid;
}
/* unary_oper_get_candidates()
* given opname, find all possible types for which
* a right/left unary operator named opname exists.
@ -737,6 +792,10 @@ unary_oper_get_candidates(char *opname,
/* Given unary right operator (operator on right), return oper struct
*
* IMPORTANT: the returned operator (if any) is only promised to be
* coercion-compatible with the input datatype. Do not use this if
* you need an exact- or binary-compatible match.
*
* Always raises error on failure.
*
@ -764,16 +823,11 @@ right_oper(char *op, Oid arg)
ncandidates = unary_oper_get_candidates(op, &candidates, 'r');
if (ncandidates == 0)
unary_op_error(op, arg, FALSE);
else if (ncandidates == 1)
{
tup = SearchSysCache(OPERNAME,
PointerGetDatum(op),
ObjectIdGetDatum(candidates->args[0]),
ObjectIdGetDatum(InvalidOid),
CharGetDatum('r'));
}
else
{
/* We must run oper_select_candidate even if only one candidate,
* otherwise we may falsely return a non-type-compatible operator.
*/
targetOid = oper_select_candidate(1, &arg, candidates);
if (targetOid != NULL)
tup = SearchSysCache(OPERNAME,
@ -792,6 +846,10 @@ right_oper(char *op, Oid arg)
/* Given unary left operator (operator on left), return oper struct
*
* IMPORTANT: the returned operator (if any) is only promised to be
* coercion-compatible with the input datatype. Do not use this if
* you need an exact- or binary-compatible match.
*
* Always raises error on failure.
*
@ -819,16 +877,11 @@ left_oper(char *op, Oid arg)
ncandidates = unary_oper_get_candidates(op, &candidates, 'l');
if (ncandidates == 0)
unary_op_error(op, arg, TRUE);
else if (ncandidates == 1)
{
tup = SearchSysCache(OPERNAME,
PointerGetDatum(op),
ObjectIdGetDatum(InvalidOid),
ObjectIdGetDatum(candidates->args[0]),
CharGetDatum('l'));
}
else
{
/* We must run oper_select_candidate even if only one candidate,
* otherwise we may falsely return a non-type-compatible operator.
*/
targetOid = oper_select_candidate(1, &arg, candidates);
if (targetOid != NULL)
tup = SearchSysCache(OPERNAME,

View File

@ -1,13 +1,13 @@
/*-------------------------------------------------------------------------
*
* catalog_utils.h
* parse_oper.h
*
*
*
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: parse_oper.h,v 1.13 2001/01/24 19:43:27 momjian Exp $
* $Id: parse_oper.h,v 1.14 2001/02/16 03:16:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -18,13 +18,27 @@
typedef HeapTuple Operator;
/* Routines to find operators matching a name and given input types */
/* NB: the selected operator may require coercion of the input types! */
extern Operator oper(char *op, Oid arg1, Oid arg2, bool noError);
extern Operator right_oper(char *op, Oid arg);
extern Operator left_oper(char *op, Oid arg);
extern Oid oper_oid(char *op, Oid arg1, Oid arg2, bool noError);
extern Oid oprid(Operator op);
/* Routines to find operators that DO NOT require coercion --- ie, their */
/* input types are either exactly as given, or binary-compatible */
extern Operator compatible_oper(char *op, Oid arg1, Oid arg2, bool noError);
/* currently no need for compatible_left_oper/compatible_right_oper */
extern Oid any_ordering_op(Oid restype);
/* Convenience routines that call compatible_oper() and return either */
/* the operator OID or the underlying function OID, or InvalidOid if fail */
extern Oid compatible_oper_opid(char *op, Oid arg1, Oid arg2, bool noError);
extern Oid compatible_oper_funcid(char *op, Oid arg1, Oid arg2, bool noError);
/* Convenience routine that packages a specific call on compatible_oper */
extern Oid any_ordering_op(Oid argtype);
/* Extract operator OID or underlying-function OID from an Operator tuple */
extern Oid oprid(Operator op);
extern Oid oprfuncid(Operator op);
#endif /* PARSE_OPER_H */