/*------------------------------------------------------------------------- * * numeric.c * An exact numeric data type for the Postgres database system * * Original coding 1998, Jan Wieck. Heavily revised 2003, Tom Lane. * * Many of the algorithmic ideas are borrowed from David M. Smith's "FM" * multiple-precision math library, most recently published as Algorithm * 786: Multiple-Precision Complex Arithmetic and Functions, ACM * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998, * pages 359-367. * * Copyright (c) 1998-2009, PostgreSQL Global Development Group * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/utils/adt/numeric.c,v 1.118 2009/06/11 14:49:03 momjian Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include #include "access/hash.h" #include "catalog/pg_type.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/int8.h" #include "utils/numeric.h" /* ---------- * Uncomment the following to enable compilation of dump_numeric() * and dump_var() and to get a dump of any result produced by make_result(). * ---------- #define NUMERIC_DEBUG */ /* ---------- * Local data types * * Numeric values are represented in a base-NBASE floating point format. * Each "digit" ranges from 0 to NBASE-1. The type NumericDigit is signed * and wide enough to store a digit. We assume that NBASE*NBASE can fit in * an int. Although the purely calculational routines could handle any even * NBASE that's less than sqrt(INT_MAX), in practice we are only interested * in NBASE a power of ten, so that I/O conversions and decimal rounding * are easy. Also, it's actually more efficient if NBASE is rather less than * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to * postpone processing carries. * ---------- */ #if 0 #define NBASE 10 #define HALF_NBASE 5 #define DEC_DIGITS 1 /* decimal digits per NBASE digit */ #define MUL_GUARD_DIGITS 4 /* these are measured in NBASE digits */ #define DIV_GUARD_DIGITS 8 typedef signed char NumericDigit; #endif #if 0 #define NBASE 100 #define HALF_NBASE 50 #define DEC_DIGITS 2 /* decimal digits per NBASE digit */ #define MUL_GUARD_DIGITS 3 /* these are measured in NBASE digits */ #define DIV_GUARD_DIGITS 6 typedef signed char NumericDigit; #endif #if 1 #define NBASE 10000 #define HALF_NBASE 5000 #define DEC_DIGITS 4 /* decimal digits per NBASE digit */ #define MUL_GUARD_DIGITS 2 /* these are measured in NBASE digits */ #define DIV_GUARD_DIGITS 4 typedef int16 NumericDigit; #endif /* ---------- * NumericVar is the format we use for arithmetic. The digit-array part * is the same as the NumericData storage format, but the header is more * complex. * * The value represented by a NumericVar is determined by the sign, weight, * ndigits, and digits[] array. * Note: the first digit of a NumericVar's value is assumed to be multiplied * by NBASE ** weight. Another way to say it is that there are weight+1 * digits before the decimal point. It is possible to have weight < 0. * * buf points at the physical start of the palloc'd digit buffer for the * NumericVar. digits points at the first digit in actual use (the one * with the specified weight). We normally leave an unused digit or two * (preset to zeroes) between buf and digits, so that there is room to store * a carry out of the top digit without reallocating space. We just need to * decrement digits (and increment weight) to make room for the carry digit. * (There is no such extra space in a numeric value stored in the database, * only in a NumericVar in memory.) * * If buf is NULL then the digit buffer isn't actually palloc'd and should * not be freed --- see the constants below for an example. * * dscale, or display scale, is the nominal precision expressed as number * of digits after the decimal point (it must always be >= 0 at present). * dscale may be more than the number of physically stored fractional digits, * implying that we have suppressed storage of significant trailing zeroes. * It should never be less than the number of stored digits, since that would * imply hiding digits that are present. NOTE that dscale is always expressed * in *decimal* digits, and so it may correspond to a fractional number of * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits. * * rscale, or result scale, is the target precision for a computation. * Like dscale it is expressed as number of *decimal* digits after the decimal * point, and is always >= 0 at present. * Note that rscale is not stored in variables --- it's figured on-the-fly * from the dscales of the inputs. * * NB: All the variable-level functions are written in a style that makes it * possible to give one and the same variable as argument and destination. * This is feasible because the digit buffer is separate from the variable. * ---------- */ typedef struct NumericVar { int ndigits; /* # of digits in digits[] - can be 0! */ int weight; /* weight of first digit */ int sign; /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */ int dscale; /* display scale */ NumericDigit *buf; /* start of palloc'd space for digits[] */ NumericDigit *digits; /* base-NBASE digits */ } NumericVar; /* ---------- * Some preinitialized constants * ---------- */ static NumericDigit const_zero_data[1] = {0}; static NumericVar const_zero = {0, 0, NUMERIC_POS, 0, NULL, const_zero_data}; static NumericDigit const_one_data[1] = {1}; static NumericVar const_one = {1, 0, NUMERIC_POS, 0, NULL, const_one_data}; static NumericDigit const_two_data[1] = {2}; static NumericVar const_two = {1, 0, NUMERIC_POS, 0, NULL, const_two_data}; #if DEC_DIGITS == 4 static NumericDigit const_zero_point_five_data[1] = {5000}; #elif DEC_DIGITS == 2 static NumericDigit const_zero_point_five_data[1] = {50}; #elif DEC_DIGITS == 1 static NumericDigit const_zero_point_five_data[1] = {5}; #endif static NumericVar const_zero_point_five = {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data}; #if DEC_DIGITS == 4 static NumericDigit const_zero_point_nine_data[1] = {9000}; #elif DEC_DIGITS == 2 static NumericDigit const_zero_point_nine_data[1] = {90}; #elif DEC_DIGITS == 1 static NumericDigit const_zero_point_nine_data[1] = {9}; #endif static NumericVar const_zero_point_nine = {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data}; #if DEC_DIGITS == 4 static NumericDigit const_zero_point_01_data[1] = {100}; static NumericVar const_zero_point_01 = {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; #elif DEC_DIGITS == 2 static NumericDigit const_zero_point_01_data[1] = {1}; static NumericVar const_zero_point_01 = {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; #elif DEC_DIGITS == 1 static NumericDigit const_zero_point_01_data[1] = {1}; static NumericVar const_zero_point_01 = {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; #endif #if DEC_DIGITS == 4 static NumericDigit const_one_point_one_data[2] = {1, 1000}; #elif DEC_DIGITS == 2 static NumericDigit const_one_point_one_data[2] = {1, 10}; #elif DEC_DIGITS == 1 static NumericDigit const_one_point_one_data[2] = {1, 1}; #endif static NumericVar const_one_point_one = {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data}; static NumericVar const_nan = {0, 0, NUMERIC_NAN, 0, NULL, NULL}; #if DEC_DIGITS == 4 static const int round_powers[4] = {0, 1000, 100, 10}; #endif /* ---------- * Local functions * ---------- */ #ifdef NUMERIC_DEBUG static void dump_numeric(const char *str, Numeric num); static void dump_var(const char *str, NumericVar *var); #else #define dump_numeric(s,n) #define dump_var(s,v) #endif #define digitbuf_alloc(ndigits) \ ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit))) #define digitbuf_free(buf) \ do { \ if ((buf) != NULL) \ pfree(buf); \ } while (0) #define init_var(v) MemSetAligned(v, 0, sizeof(NumericVar)) #define NUMERIC_DIGITS(num) ((NumericDigit *)(num)->n_data) #define NUMERIC_NDIGITS(num) \ ((VARSIZE(num) - NUMERIC_HDRSZ) / sizeof(NumericDigit)) static void alloc_var(NumericVar *var, int ndigits); static void free_var(NumericVar *var); static void zero_var(NumericVar *var); static const char *set_var_from_str(const char *str, const char *cp, NumericVar *dest); static void set_var_from_num(Numeric value, NumericVar *dest); static void set_var_from_var(NumericVar *value, NumericVar *dest); static char *get_str_from_var(NumericVar *var, int dscale); static Numeric make_result(NumericVar *var); static void apply_typmod(NumericVar *var, int32 typmod); static int32 numericvar_to_int4(NumericVar *var); static bool numericvar_to_int8(NumericVar *var, int64 *result); static void int8_to_numericvar(int64 val, NumericVar *var); static double numeric_to_double_no_overflow(Numeric num); static double numericvar_to_double_no_overflow(NumericVar *var); static int cmp_numerics(Numeric num1, Numeric num2); static int cmp_var(NumericVar *var1, NumericVar *var2); static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits, int var1weight, int var1sign, const NumericDigit *var2digits, int var2ndigits, int var2weight, int var2sign); static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result); static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result); static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale); static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale, bool round); static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale, bool round); static int select_div_scale(NumericVar *var1, NumericVar *var2); static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result); static void ceil_var(NumericVar *var, NumericVar *result); static void floor_var(NumericVar *var, NumericVar *result); static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale); static void exp_var(NumericVar *arg, NumericVar *result, int rscale); static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale); static void ln_var(NumericVar *arg, NumericVar *result, int rscale); static void log_var(NumericVar *base, NumericVar *num, NumericVar *result); static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result); static void power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale); static int cmp_abs(NumericVar *var1, NumericVar *var2); static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight, const NumericDigit *var2digits, int var2ndigits, int var2weight); static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result); static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result); static void round_var(NumericVar *var, int rscale); static void trunc_var(NumericVar *var, int rscale); static void strip_var(NumericVar *var); static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2, NumericVar *count_var, NumericVar *result_var); /* ---------------------------------------------------------------------- * * Input-, output- and rounding-functions * * ---------------------------------------------------------------------- */ /* * numeric_in() - * * Input function for numeric data type */ Datum numeric_in(PG_FUNCTION_ARGS) { char *str = PG_GETARG_CSTRING(0); #ifdef NOT_USED Oid typelem = PG_GETARG_OID(1); #endif int32 typmod = PG_GETARG_INT32(2); Numeric res; const char *cp; /* Skip leading spaces */ cp = str; while (*cp) { if (!isspace((unsigned char) *cp)) break; cp++; } /* * Check for NaN */ if (pg_strncasecmp(cp, "NaN", 3) == 0) { res = make_result(&const_nan); /* Should be nothing left but spaces */ cp += 3; while (*cp) { if (!isspace((unsigned char) *cp)) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); cp++; } } else { /* * Use set_var_from_str() to parse a normal numeric value */ NumericVar value; init_var(&value); cp = set_var_from_str(str, cp, &value); /* * We duplicate a few lines of code here because we would like to * throw any trailing-junk syntax error before any semantic error * resulting from apply_typmod. We can't easily fold the two cases * together because we mustn't apply apply_typmod to a NaN. */ while (*cp) { if (!isspace((unsigned char) *cp)) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); cp++; } apply_typmod(&value, typmod); res = make_result(&value); free_var(&value); } PG_RETURN_NUMERIC(res); } /* * numeric_out() - * * Output function for numeric data type */ Datum numeric_out(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; char *str; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_CSTRING(pstrdup("NaN")); /* * Get the number in the variable format. * * Even if we didn't need to change format, we'd still need to copy the * value to have a modifiable copy for rounding. set_var_from_num() also * guarantees there is extra digit space in case we produce a carry out * from rounding. */ init_var(&x); set_var_from_num(num, &x); str = get_str_from_var(&x, x.dscale); free_var(&x); PG_RETURN_CSTRING(str); } /* * numeric_recv - converts external binary format to numeric * * External format is a sequence of int16's: * ndigits, weight, sign, dscale, NumericDigits. */ Datum numeric_recv(PG_FUNCTION_ARGS) { StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); #ifdef NOT_USED Oid typelem = PG_GETARG_OID(1); #endif int32 typmod = PG_GETARG_INT32(2); NumericVar value; Numeric res; int len, i; init_var(&value); len = (uint16) pq_getmsgint(buf, sizeof(uint16)); if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("invalid length in external \"numeric\" value"))); alloc_var(&value, len); value.weight = (int16) pq_getmsgint(buf, sizeof(int16)); value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16)); if (!(value.sign == NUMERIC_POS || value.sign == NUMERIC_NEG || value.sign == NUMERIC_NAN)) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("invalid sign in external \"numeric\" value"))); value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16)); for (i = 0; i < len; i++) { NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit)); if (d < 0 || d >= NBASE) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("invalid digit in external \"numeric\" value"))); value.digits[i] = d; } apply_typmod(&value, typmod); res = make_result(&value); free_var(&value); PG_RETURN_NUMERIC(res); } /* * numeric_send - converts numeric to binary format */ Datum numeric_send(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; StringInfoData buf; int i; init_var(&x); set_var_from_num(num, &x); pq_begintypsend(&buf); pq_sendint(&buf, x.ndigits, sizeof(int16)); pq_sendint(&buf, x.weight, sizeof(int16)); pq_sendint(&buf, x.sign, sizeof(int16)); pq_sendint(&buf, x.dscale, sizeof(int16)); for (i = 0; i < x.ndigits; i++) pq_sendint(&buf, x.digits[i], sizeof(NumericDigit)); free_var(&x); PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); } /* * numeric() - * * This is a special function called by the Postgres database system * before a value is stored in a tuple's attribute. The precision and * scale of the attribute have to be applied on the value. */ Datum numeric (PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); int32 typmod = PG_GETARG_INT32(1); Numeric new; int32 tmp_typmod; int precision; int scale; int ddigits; int maxdigits; NumericVar var; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * If the value isn't a valid type modifier, simply return a copy of the * input value */ if (typmod < (int32) (VARHDRSZ)) { new = (Numeric) palloc(VARSIZE(num)); memcpy(new, num, VARSIZE(num)); PG_RETURN_NUMERIC(new); } /* * Get the precision and scale out of the typmod value */ tmp_typmod = typmod - VARHDRSZ; precision = (tmp_typmod >> 16) & 0xffff; scale = tmp_typmod & 0xffff; maxdigits = precision - scale; /* * If the number is certainly in bounds and due to the target scale no * rounding could be necessary, just make a copy of the input and modify * its scale fields. (Note we assume the existing dscale is honest...) */ ddigits = (num->n_weight + 1) * DEC_DIGITS; if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)) { new = (Numeric) palloc(VARSIZE(num)); memcpy(new, num, VARSIZE(num)); new->n_sign_dscale = NUMERIC_SIGN(new) | ((uint16) scale & NUMERIC_DSCALE_MASK); PG_RETURN_NUMERIC(new); } /* * We really need to fiddle with things - unpack the number into a * variable and let apply_typmod() do it. */ init_var(&var); set_var_from_num(num, &var); apply_typmod(&var, typmod); new = make_result(&var); free_var(&var); PG_RETURN_NUMERIC(new); } Datum numerictypmodin(PG_FUNCTION_ARGS) { ArrayType *ta = PG_GETARG_ARRAYTYPE_P(0); int32 *tl; int n; int32 typmod; tl = ArrayGetIntegerTypmods(ta, &n); if (n == 2) { if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("NUMERIC precision %d must be between 1 and %d", tl[0], NUMERIC_MAX_PRECISION))); if (tl[1] < 0 || tl[1] > tl[0]) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("NUMERIC scale %d must be between 0 and precision %d", tl[1], tl[0]))); typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ; } else if (n == 1) { if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("NUMERIC precision %d must be between 1 and %d", tl[0], NUMERIC_MAX_PRECISION))); /* scale defaults to zero */ typmod = (tl[0] << 16) + VARHDRSZ; } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid NUMERIC type modifier"))); typmod = 0; /* keep compiler quiet */ } PG_RETURN_INT32(typmod); } Datum numerictypmodout(PG_FUNCTION_ARGS) { int32 typmod = PG_GETARG_INT32(0); char *res = (char *) palloc(64); if (typmod >= 0) snprintf(res, 64, "(%d,%d)", ((typmod - VARHDRSZ) >> 16) & 0xffff, (typmod - VARHDRSZ) & 0xffff); else *res = '\0'; PG_RETURN_CSTRING(res); } /* ---------------------------------------------------------------------- * * Sign manipulation, rounding and the like * * ---------------------------------------------------------------------- */ Datum numeric_abs(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Do it the easy way directly on the packed format */ res = (Numeric) palloc(VARSIZE(num)); memcpy(res, num, VARSIZE(num)); res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num); PG_RETURN_NUMERIC(res); } Datum numeric_uminus(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Do it the easy way directly on the packed format */ res = (Numeric) palloc(VARSIZE(num)); memcpy(res, num, VARSIZE(num)); /* * The packed format is known to be totally zero digit trimmed always. So * we can identify a ZERO by the fact that there are no digits at all. Do * nothing to a zero. */ if (VARSIZE(num) != NUMERIC_HDRSZ) { /* Else, flip the sign */ if (NUMERIC_SIGN(num) == NUMERIC_POS) res->n_sign_dscale = NUMERIC_NEG | NUMERIC_DSCALE(num); else res->n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num); } PG_RETURN_NUMERIC(res); } Datum numeric_uplus(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; res = (Numeric) palloc(VARSIZE(num)); memcpy(res, num, VARSIZE(num)); PG_RETURN_NUMERIC(res); } /* * numeric_sign() - * * returns -1 if the argument is less than 0, 0 if the argument is equal * to 0, and 1 if the argument is greater than zero. */ Datum numeric_sign(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar result; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); init_var(&result); /* * The packed format is known to be totally zero digit trimmed always. So * we can identify a ZERO by the fact that there are no digits at all. */ if (VARSIZE(num) == NUMERIC_HDRSZ) set_var_from_var(&const_zero, &result); else { /* * And if there are some, we return a copy of ONE with the sign of our * argument */ set_var_from_var(&const_one, &result); result.sign = NUMERIC_SIGN(num); } res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_round() - * * Round a value to have 'scale' digits after the decimal point. * We allow negative 'scale', implying rounding before the decimal * point --- Oracle interprets rounding that way. */ Datum numeric_round(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); int32 scale = PG_GETARG_INT32(1); Numeric res; NumericVar arg; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Limit the scale value to avoid possible overflow in calculations */ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE); scale = Min(scale, NUMERIC_MAX_RESULT_SCALE); /* * Unpack the argument and round it at the proper digit position */ init_var(&arg); set_var_from_num(num, &arg); round_var(&arg, scale); /* We don't allow negative output dscale */ if (scale < 0) arg.dscale = 0; /* * Return the rounded result */ res = make_result(&arg); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_trunc() - * * Truncate a value to have 'scale' digits after the decimal point. * We allow negative 'scale', implying a truncation before the decimal * point --- Oracle interprets truncation that way. */ Datum numeric_trunc(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); int32 scale = PG_GETARG_INT32(1); Numeric res; NumericVar arg; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Limit the scale value to avoid possible overflow in calculations */ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE); scale = Min(scale, NUMERIC_MAX_RESULT_SCALE); /* * Unpack the argument and truncate it at the proper digit position */ init_var(&arg); set_var_from_num(num, &arg); trunc_var(&arg, scale); /* We don't allow negative output dscale */ if (scale < 0) arg.dscale = 0; /* * Return the truncated result */ res = make_result(&arg); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_ceil() - * * Return the smallest integer greater than or equal to the argument */ Datum numeric_ceil(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar result; if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); init_var(&result); set_var_from_num(num, &result); ceil_var(&result, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_floor() - * * Return the largest integer equal to or less than the argument */ Datum numeric_floor(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar result; if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); init_var(&result); set_var_from_num(num, &result); floor_var(&result, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } /* * Implements the numeric version of the width_bucket() function * defined by SQL2003. See also width_bucket_float8(). * * 'bound1' and 'bound2' are the lower and upper bounds of the * histogram's range, respectively. 'count' is the number of buckets * in the histogram. width_bucket() returns an integer indicating the * bucket number that 'operand' belongs to in an equiwidth histogram * with the specified characteristics. An operand smaller than the * lower bound is assigned to bucket 0. An operand greater than the * upper bound is assigned to an additional bucket (with number * count+1). We don't allow "NaN" for any of the numeric arguments. */ Datum width_bucket_numeric(PG_FUNCTION_ARGS) { Numeric operand = PG_GETARG_NUMERIC(0); Numeric bound1 = PG_GETARG_NUMERIC(1); Numeric bound2 = PG_GETARG_NUMERIC(2); int32 count = PG_GETARG_INT32(3); NumericVar count_var; NumericVar result_var; int32 result; if (count <= 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION), errmsg("count must be greater than zero"))); if (NUMERIC_IS_NAN(operand) || NUMERIC_IS_NAN(bound1) || NUMERIC_IS_NAN(bound2)) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION), errmsg("operand, lower bound and upper bound cannot be NaN"))); init_var(&result_var); init_var(&count_var); /* Convert 'count' to a numeric, for ease of use later */ int8_to_numericvar((int64) count, &count_var); switch (cmp_numerics(bound1, bound2)) { case 0: ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION), errmsg("lower bound cannot equal upper bound"))); /* bound1 < bound2 */ case -1: if (cmp_numerics(operand, bound1) < 0) set_var_from_var(&const_zero, &result_var); else if (cmp_numerics(operand, bound2) >= 0) add_var(&count_var, &const_one, &result_var); else compute_bucket(operand, bound1, bound2, &count_var, &result_var); break; /* bound1 > bound2 */ case 1: if (cmp_numerics(operand, bound1) > 0) set_var_from_var(&const_zero, &result_var); else if (cmp_numerics(operand, bound2) <= 0) add_var(&count_var, &const_one, &result_var); else compute_bucket(operand, bound1, bound2, &count_var, &result_var); break; } /* if result exceeds the range of a legal int4, we ereport here */ result = numericvar_to_int4(&result_var); free_var(&count_var); free_var(&result_var); PG_RETURN_INT32(result); } /* * If 'operand' is not outside the bucket range, determine the correct * bucket for it to go. The calculations performed by this function * are derived directly from the SQL2003 spec. */ static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2, NumericVar *count_var, NumericVar *result_var) { NumericVar bound1_var; NumericVar bound2_var; NumericVar operand_var; init_var(&bound1_var); init_var(&bound2_var); init_var(&operand_var); set_var_from_num(bound1, &bound1_var); set_var_from_num(bound2, &bound2_var); set_var_from_num(operand, &operand_var); if (cmp_var(&bound1_var, &bound2_var) < 0) { sub_var(&operand_var, &bound1_var, &operand_var); sub_var(&bound2_var, &bound1_var, &bound2_var); div_var(&operand_var, &bound2_var, result_var, select_div_scale(&operand_var, &bound2_var), true); } else { sub_var(&bound1_var, &operand_var, &operand_var); sub_var(&bound1_var, &bound2_var, &bound1_var); div_var(&operand_var, &bound1_var, result_var, select_div_scale(&operand_var, &bound1_var), true); } mul_var(result_var, count_var, result_var, result_var->dscale + count_var->dscale); add_var(result_var, &const_one, result_var); floor_var(result_var, result_var); free_var(&bound1_var); free_var(&bound2_var); free_var(&operand_var); } /* ---------------------------------------------------------------------- * * Comparison functions * * Note: btree indexes need these routines not to leak memory; therefore, * be careful to free working copies of toasted datums. Most places don't * need to be so careful. * ---------------------------------------------------------------------- */ Datum numeric_cmp(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); int result; result = cmp_numerics(num1, num2); PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_INT32(result); } Datum numeric_eq(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) == 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } Datum numeric_ne(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) != 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } Datum numeric_gt(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) > 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } Datum numeric_ge(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) >= 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } Datum numeric_lt(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) < 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } Datum numeric_le(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); bool result; result = cmp_numerics(num1, num2) <= 0; PG_FREE_IF_COPY(num1, 0); PG_FREE_IF_COPY(num2, 1); PG_RETURN_BOOL(result); } static int cmp_numerics(Numeric num1, Numeric num2) { int result; /* * We consider all NANs to be equal and larger than any non-NAN. This is * somewhat arbitrary; the important thing is to have a consistent sort * order. */ if (NUMERIC_IS_NAN(num1)) { if (NUMERIC_IS_NAN(num2)) result = 0; /* NAN = NAN */ else result = 1; /* NAN > non-NAN */ } else if (NUMERIC_IS_NAN(num2)) { result = -1; /* non-NAN < NAN */ } else { result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1), num1->n_weight, NUMERIC_SIGN(num1), NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2), num2->n_weight, NUMERIC_SIGN(num2)); } return result; } Datum hash_numeric(PG_FUNCTION_ARGS) { Numeric key = PG_GETARG_NUMERIC(0); Datum digit_hash; Datum result; int weight; int start_offset; int end_offset; int i; int hash_len; /* If it's NaN, don't try to hash the rest of the fields */ if (NUMERIC_IS_NAN(key)) PG_RETURN_UINT32(0); weight = key->n_weight; start_offset = 0; end_offset = 0; /* * Omit any leading or trailing zeros from the input to the hash. The * numeric implementation *should* guarantee that leading and trailing * zeros are suppressed, but we're paranoid. Note that we measure the * starting and ending offsets in units of NumericDigits, not bytes. */ for (i = 0; i < NUMERIC_NDIGITS(key); i++) { if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0) break; start_offset++; /* * The weight is effectively the # of digits before the decimal point, * so decrement it for each leading zero we skip. */ weight--; } /* * If there are no non-zero digits, then the value of the number is zero, * regardless of any other fields. */ if (NUMERIC_NDIGITS(key) == start_offset) PG_RETURN_UINT32(-1); for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--) { if (NUMERIC_DIGITS(key)[i] != (NumericDigit) 0) break; end_offset++; } /* If we get here, there should be at least one non-zero digit */ Assert(start_offset + end_offset < NUMERIC_NDIGITS(key)); /* * Note that we don't hash on the Numeric's scale, since two numerics can * compare equal but have different scales. We also don't hash on the * sign, although we could: since a sign difference implies inequality, * this shouldn't affect correctness. */ hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset; digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset), hash_len * sizeof(NumericDigit)); /* Mix in the weight, via XOR */ result = digit_hash ^ weight; PG_RETURN_DATUM(result); } /* ---------------------------------------------------------------------- * * Basic arithmetic functions * * ---------------------------------------------------------------------- */ /* * numeric_add() - * * Add two numerics */ Datum numeric_add(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); NumericVar arg1; NumericVar arg2; NumericVar result; Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the values, let add_var() compute the result and return it. */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); add_var(&arg1, &arg2, &result); res = make_result(&result); free_var(&arg1); free_var(&arg2); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_sub() - * * Subtract one numeric from another */ Datum numeric_sub(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); NumericVar arg1; NumericVar arg2; NumericVar result; Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the values, let sub_var() compute the result and return it. */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); sub_var(&arg1, &arg2, &result); res = make_result(&result); free_var(&arg1); free_var(&arg2); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_mul() - * * Calculate the product of two numerics */ Datum numeric_mul(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); NumericVar arg1; NumericVar arg2; NumericVar result; Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the values, let mul_var() compute the result and return it. * Unlike add_var() and sub_var(), mul_var() will round its result. In the * case of numeric_mul(), which is invoked for the * operator on numerics, * we request exact representation for the product (rscale = sum(dscale of * arg1, dscale of arg2)). */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale); res = make_result(&result); free_var(&arg1); free_var(&arg2); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_div() - * * Divide one numeric into another */ Datum numeric_div(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); NumericVar arg1; NumericVar arg2; NumericVar result; Numeric res; int rscale; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the arguments */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); /* * Select scale for division result */ rscale = select_div_scale(&arg1, &arg2); /* * Do the divide and return the result */ div_var(&arg1, &arg2, &result, rscale, true); res = make_result(&result); free_var(&arg1); free_var(&arg2); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_div_trunc() - * * Divide one numeric into another, truncating the result to an integer */ Datum numeric_div_trunc(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); NumericVar arg1; NumericVar arg2; NumericVar result; Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the arguments */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); /* * Do the divide and return the result */ div_var(&arg1, &arg2, &result, 0, false); res = make_result(&result); free_var(&arg1); free_var(&arg2); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_mod() - * * Calculate the modulo of two numerics */ Datum numeric_mod(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); Numeric res; NumericVar arg1; NumericVar arg2; NumericVar result; if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); mod_var(&arg1, &arg2, &result); res = make_result(&result); free_var(&result); free_var(&arg2); free_var(&arg1); PG_RETURN_NUMERIC(res); } /* * numeric_inc() - * * Increment a number by one */ Datum numeric_inc(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar arg; Numeric res; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Compute the result and return it */ init_var(&arg); set_var_from_num(num, &arg); add_var(&arg, &const_one, &arg); res = make_result(&arg); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_smaller() - * * Return the smaller of two numbers */ Datum numeric_smaller(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); /* * Use cmp_numerics so that this will agree with the comparison operators, * particularly as regards comparisons involving NaN. */ if (cmp_numerics(num1, num2) < 0) PG_RETURN_NUMERIC(num1); else PG_RETURN_NUMERIC(num2); } /* * numeric_larger() - * * Return the larger of two numbers */ Datum numeric_larger(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); /* * Use cmp_numerics so that this will agree with the comparison operators, * particularly as regards comparisons involving NaN. */ if (cmp_numerics(num1, num2) > 0) PG_RETURN_NUMERIC(num1); else PG_RETURN_NUMERIC(num2); } /* ---------------------------------------------------------------------- * * Advanced math functions * * ---------------------------------------------------------------------- */ /* * numeric_fac() * * Compute factorial */ Datum numeric_fac(PG_FUNCTION_ARGS) { int64 num = PG_GETARG_INT64(0); Numeric res; NumericVar fact; NumericVar result; if (num <= 1) { res = make_result(&const_one); PG_RETURN_NUMERIC(res); } /* Fail immediately if the result would overflow */ if (num > 32177) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("value overflows numeric format"))); init_var(&fact); init_var(&result); int8_to_numericvar(num, &result); for (num = num - 1; num > 1; num--) { /* this loop can take awhile, so allow it to be interrupted */ CHECK_FOR_INTERRUPTS(); int8_to_numericvar(num, &fact); mul_var(&result, &fact, &result, 0); } res = make_result(&result); free_var(&fact); free_var(&result); PG_RETURN_NUMERIC(res); } /* * numeric_sqrt() - * * Compute the square root of a numeric. */ Datum numeric_sqrt(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar arg; NumericVar result; int sweight; int rscale; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the argument and determine the result scale. We choose a scale * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any * case not less than the input's dscale. */ init_var(&arg); init_var(&result); set_var_from_num(num, &arg); /* Assume the input was normalized, so arg.weight is accurate */ sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1; rscale = NUMERIC_MIN_SIG_DIGITS - sweight; rscale = Max(rscale, arg.dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); /* * Let sqrt_var() do the calculation and return the result. */ sqrt_var(&arg, &result, rscale); res = make_result(&result); free_var(&result); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_exp() - * * Raise e to the power of x */ Datum numeric_exp(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar arg; NumericVar result; int rscale; double val; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Unpack the argument and determine the result scale. We choose a scale * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any * case not less than the input's dscale. */ init_var(&arg); init_var(&result); set_var_from_num(num, &arg); /* convert input to float8, ignoring overflow */ val = numericvar_to_double_no_overflow(&arg); /* * log10(result) = num * log10(e), so this is approximately the decimal * weight of the result: */ val *= 0.434294481903252; /* limit to something that won't cause integer overflow */ val = Max(val, -NUMERIC_MAX_RESULT_SCALE); val = Min(val, NUMERIC_MAX_RESULT_SCALE); rscale = NUMERIC_MIN_SIG_DIGITS - (int) val; rscale = Max(rscale, arg.dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); /* * Let exp_var() do the calculation and return the result. */ exp_var(&arg, &result, rscale); res = make_result(&result); free_var(&result); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_ln() - * * Compute the natural logarithm of x */ Datum numeric_ln(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); Numeric res; NumericVar arg; NumericVar result; int dec_digits; int rscale; /* * Handle NaN */ if (NUMERIC_IS_NAN(num)) PG_RETURN_NUMERIC(make_result(&const_nan)); init_var(&arg); init_var(&result); set_var_from_num(num, &arg); /* Approx decimal digits before decimal point */ dec_digits = (arg.weight + 1) * DEC_DIGITS; if (dec_digits > 1) rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1); else if (dec_digits < 1) rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits); else rscale = NUMERIC_MIN_SIG_DIGITS; rscale = Max(rscale, arg.dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); ln_var(&arg, &result, rscale); res = make_result(&result); free_var(&result); free_var(&arg); PG_RETURN_NUMERIC(res); } /* * numeric_log() - * * Compute the logarithm of x in a given base */ Datum numeric_log(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); Numeric res; NumericVar arg1; NumericVar arg2; NumericVar result; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Initialize things */ init_var(&arg1); init_var(&arg2); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); /* * Call log_var() to compute and return the result; note it handles scale * selection itself. */ log_var(&arg1, &arg2, &result); res = make_result(&result); free_var(&result); free_var(&arg2); free_var(&arg1); PG_RETURN_NUMERIC(res); } /* * numeric_power() - * * Raise b to the power of x */ Datum numeric_power(PG_FUNCTION_ARGS) { Numeric num1 = PG_GETARG_NUMERIC(0); Numeric num2 = PG_GETARG_NUMERIC(1); Numeric res; NumericVar arg1; NumericVar arg2; NumericVar arg2_trunc; NumericVar result; /* * Handle NaN */ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2)) PG_RETURN_NUMERIC(make_result(&const_nan)); /* * Initialize things */ init_var(&arg1); init_var(&arg2); init_var(&arg2_trunc); init_var(&result); set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); set_var_from_var(&arg2, &arg2_trunc); trunc_var(&arg2_trunc, 0); /* * The SQL spec requires that we emit a particular SQLSTATE error code for * certain error conditions. Specifically, we don't return a * divide-by-zero error code for 0 ^ -1. */ if (cmp_var(&arg1, &const_zero) == 0 && cmp_var(&arg2, &const_zero) < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION), errmsg("zero raised to a negative power is undefined"))); if (cmp_var(&arg1, &const_zero) < 0 && cmp_var(&arg2, &arg2_trunc) != 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION), errmsg("a negative number raised to a non-integer power yields a complex result"))); /* * Call power_var() to compute and return the result; note it handles * scale selection itself. */ power_var(&arg1, &arg2, &result); res = make_result(&result); free_var(&result); free_var(&arg2); free_var(&arg2_trunc); free_var(&arg1); PG_RETURN_NUMERIC(res); } /* ---------------------------------------------------------------------- * * Type conversion functions * * ---------------------------------------------------------------------- */ Datum int4_numeric(PG_FUNCTION_ARGS) { int32 val = PG_GETARG_INT32(0); Numeric res; NumericVar result; init_var(&result); int8_to_numericvar((int64) val, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } Datum numeric_int4(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; int32 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot convert NaN to integer"))); /* Convert to variable format, then convert to int4 */ init_var(&x); set_var_from_num(num, &x); result = numericvar_to_int4(&x); free_var(&x); PG_RETURN_INT32(result); } /* * Given a NumericVar, convert it to an int32. If the NumericVar * exceeds the range of an int32, raise the appropriate error via * ereport(). The input NumericVar is *not* free'd. */ static int32 numericvar_to_int4(NumericVar *var) { int32 result; int64 val; if (!numericvar_to_int8(var, &val)) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("integer out of range"))); /* Down-convert to int4 */ result = (int32) val; /* Test for overflow by reverse-conversion. */ if ((int64) result != val) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("integer out of range"))); return result; } Datum int8_numeric(PG_FUNCTION_ARGS) { int64 val = PG_GETARG_INT64(0); Numeric res; NumericVar result; init_var(&result); int8_to_numericvar(val, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } Datum numeric_int8(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; int64 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot convert NaN to bigint"))); /* Convert to variable format and thence to int8 */ init_var(&x); set_var_from_num(num, &x); if (!numericvar_to_int8(&x, &result)) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("bigint out of range"))); free_var(&x); PG_RETURN_INT64(result); } Datum int2_numeric(PG_FUNCTION_ARGS) { int16 val = PG_GETARG_INT16(0); Numeric res; NumericVar result; init_var(&result); int8_to_numericvar((int64) val, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } Datum numeric_int2(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; int64 val; int16 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot convert NaN to smallint"))); /* Convert to variable format and thence to int8 */ init_var(&x); set_var_from_num(num, &x); if (!numericvar_to_int8(&x, &val)) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("smallint out of range"))); free_var(&x); /* Down-convert to int2 */ result = (int16) val; /* Test for overflow by reverse-conversion. */ if ((int64) result != val) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("smallint out of range"))); PG_RETURN_INT16(result); } Datum float8_numeric(PG_FUNCTION_ARGS) { float8 val = PG_GETARG_FLOAT8(0); Numeric res; NumericVar result; char buf[DBL_DIG + 100]; if (isnan(val)) PG_RETURN_NUMERIC(make_result(&const_nan)); sprintf(buf, "%.*g", DBL_DIG, val); init_var(&result); /* Assume we need not worry about leading/trailing spaces */ (void) set_var_from_str(buf, buf, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } Datum numeric_float8(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); char *tmp; Datum result; if (NUMERIC_IS_NAN(num)) PG_RETURN_FLOAT8(get_float8_nan()); tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num))); result = DirectFunctionCall1(float8in, CStringGetDatum(tmp)); pfree(tmp); PG_RETURN_DATUM(result); } /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */ Datum numeric_float8_no_overflow(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); double val; if (NUMERIC_IS_NAN(num)) PG_RETURN_FLOAT8(get_float8_nan()); val = numeric_to_double_no_overflow(num); PG_RETURN_FLOAT8(val); } Datum float4_numeric(PG_FUNCTION_ARGS) { float4 val = PG_GETARG_FLOAT4(0); Numeric res; NumericVar result; char buf[FLT_DIG + 100]; if (isnan(val)) PG_RETURN_NUMERIC(make_result(&const_nan)); sprintf(buf, "%.*g", FLT_DIG, val); init_var(&result); /* Assume we need not worry about leading/trailing spaces */ (void) set_var_from_str(buf, buf, &result); res = make_result(&result); free_var(&result); PG_RETURN_NUMERIC(res); } Datum numeric_float4(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); char *tmp; Datum result; if (NUMERIC_IS_NAN(num)) PG_RETURN_FLOAT4(get_float4_nan()); tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num))); result = DirectFunctionCall1(float4in, CStringGetDatum(tmp)); pfree(tmp); PG_RETURN_DATUM(result); } /* ---------------------------------------------------------------------- * * Aggregate functions * * The transition datatype for all these aggregates is a 3-element array * of Numeric, holding the values N, sum(X), sum(X*X) in that order. * * We represent N as a numeric mainly to avoid having to build a special * datatype; it's unlikely it'd overflow an int4, but ... * * ---------------------------------------------------------------------- */ static ArrayType * do_numeric_accum(ArrayType *transarray, Numeric newval) { Datum *transdatums; int ndatums; Datum N, sumX, sumX2; ArrayType *result; /* We assume the input is array of numeric */ deconstruct_array(transarray, NUMERICOID, -1, false, 'i', &transdatums, NULL, &ndatums); if (ndatums != 3) elog(ERROR, "expected 3-element numeric array"); N = transdatums[0]; sumX = transdatums[1]; sumX2 = transdatums[2]; N = DirectFunctionCall1(numeric_inc, N); sumX = DirectFunctionCall2(numeric_add, sumX, NumericGetDatum(newval)); sumX2 = DirectFunctionCall2(numeric_add, sumX2, DirectFunctionCall2(numeric_mul, NumericGetDatum(newval), NumericGetDatum(newval))); transdatums[0] = N; transdatums[1] = sumX; transdatums[2] = sumX2; result = construct_array(transdatums, 3, NUMERICOID, -1, false, 'i'); return result; } /* * Improve avg performance by not caclulating sum(X*X). */ static ArrayType * do_numeric_avg_accum(ArrayType *transarray, Numeric newval) { Datum *transdatums; int ndatums; Datum N, sumX; ArrayType *result; /* We assume the input is array of numeric */ deconstruct_array(transarray, NUMERICOID, -1, false, 'i', &transdatums, NULL, &ndatums); if (ndatums != 2) elog(ERROR, "expected 2-element numeric array"); N = transdatums[0]; sumX = transdatums[1]; N = DirectFunctionCall1(numeric_inc, N); sumX = DirectFunctionCall2(numeric_add, sumX, NumericGetDatum(newval)); transdatums[0] = N; transdatums[1] = sumX; result = construct_array(transdatums, 2, NUMERICOID, -1, false, 'i'); return result; } Datum numeric_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Numeric newval = PG_GETARG_NUMERIC(1); PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval)); } /* * Optimized case for average of numeric. */ Datum numeric_avg_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Numeric newval = PG_GETARG_NUMERIC(1); PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval)); } /* * Integer data types all use Numeric accumulators to share code and * avoid risk of overflow. For int2 and int4 inputs, Numeric accumulation * is overkill for the N and sum(X) values, but definitely not overkill * for the sum(X*X) value. Hence, we use int2_accum and int4_accum only * for stddev/variance --- there are faster special-purpose accumulator * routines for SUM and AVG of these datatypes. */ Datum int2_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Datum newval2 = PG_GETARG_DATUM(1); Numeric newval; newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2)); PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval)); } Datum int4_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Datum newval4 = PG_GETARG_DATUM(1); Numeric newval; newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4)); PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval)); } Datum int8_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Datum newval8 = PG_GETARG_DATUM(1); Numeric newval; newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8)); PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval)); } /* * Optimized case for average of int8. */ Datum int8_avg_accum(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Datum newval8 = PG_GETARG_DATUM(1); Numeric newval; newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8)); PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval)); } Datum numeric_avg(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Datum *transdatums; int ndatums; Numeric N, sumX; /* We assume the input is array of numeric */ deconstruct_array(transarray, NUMERICOID, -1, false, 'i', &transdatums, NULL, &ndatums); if (ndatums != 2) elog(ERROR, "expected 2-element numeric array"); N = DatumGetNumeric(transdatums[0]); sumX = DatumGetNumeric(transdatums[1]); /* SQL92 defines AVG of no values to be NULL */ /* N is zero iff no digits (cf. numeric_uminus) */ if (VARSIZE(N) == NUMERIC_HDRSZ) PG_RETURN_NULL(); PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, NumericGetDatum(sumX), NumericGetDatum(N))); } /* * Workhorse routine for the standard deviance and variance * aggregates. 'transarray' is the aggregate's transition * array. 'variance' specifies whether we should calculate the * variance or the standard deviation. 'sample' indicates whether the * caller is interested in the sample or the population * variance/stddev. * * If appropriate variance statistic is undefined for the input, * *is_null is set to true and NULL is returned. */ static Numeric numeric_stddev_internal(ArrayType *transarray, bool variance, bool sample, bool *is_null) { Datum *transdatums; int ndatums; Numeric N, sumX, sumX2, res; NumericVar vN, vsumX, vsumX2, vNminus1; NumericVar *comp; int rscale; *is_null = false; /* We assume the input is array of numeric */ deconstruct_array(transarray, NUMERICOID, -1, false, 'i', &transdatums, NULL, &ndatums); if (ndatums != 3) elog(ERROR, "expected 3-element numeric array"); N = DatumGetNumeric(transdatums[0]); sumX = DatumGetNumeric(transdatums[1]); sumX2 = DatumGetNumeric(transdatums[2]); if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2)) return make_result(&const_nan); init_var(&vN); set_var_from_num(N, &vN); /* * Sample stddev and variance are undefined when N <= 1; population stddev * is undefined when N == 0. Return NULL in either case. */ if (sample) comp = &const_one; else comp = &const_zero; if (cmp_var(&vN, comp) <= 0) { free_var(&vN); *is_null = true; return NULL; } init_var(&vNminus1); sub_var(&vN, &const_one, &vNminus1); init_var(&vsumX); set_var_from_num(sumX, &vsumX); init_var(&vsumX2); set_var_from_num(sumX2, &vsumX2); /* compute rscale for mul_var calls */ rscale = vsumX.dscale * 2; mul_var(&vsumX, &vsumX, &vsumX, rscale); /* vsumX = sumX * sumX */ mul_var(&vN, &vsumX2, &vsumX2, rscale); /* vsumX2 = N * sumX2 */ sub_var(&vsumX2, &vsumX, &vsumX2); /* N * sumX2 - sumX * sumX */ if (cmp_var(&vsumX2, &const_zero) <= 0) { /* Watch out for roundoff error producing a negative numerator */ res = make_result(&const_zero); } else { if (sample) mul_var(&vN, &vNminus1, &vNminus1, 0); /* N * (N - 1) */ else mul_var(&vN, &vN, &vNminus1, 0); /* N * N */ rscale = select_div_scale(&vsumX2, &vNminus1); div_var(&vsumX2, &vNminus1, &vsumX, rscale, true); /* variance */ if (!variance) sqrt_var(&vsumX, &vsumX, rscale); /* stddev */ res = make_result(&vsumX); } free_var(&vN); free_var(&vNminus1); free_var(&vsumX); free_var(&vsumX2); return res; } Datum numeric_var_samp(PG_FUNCTION_ARGS) { Numeric res; bool is_null; res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0), true, true, &is_null); if (is_null) PG_RETURN_NULL(); else PG_RETURN_NUMERIC(res); } Datum numeric_stddev_samp(PG_FUNCTION_ARGS) { Numeric res; bool is_null; res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0), false, true, &is_null); if (is_null) PG_RETURN_NULL(); else PG_RETURN_NUMERIC(res); } Datum numeric_var_pop(PG_FUNCTION_ARGS) { Numeric res; bool is_null; res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0), true, false, &is_null); if (is_null) PG_RETURN_NULL(); else PG_RETURN_NUMERIC(res); } Datum numeric_stddev_pop(PG_FUNCTION_ARGS) { Numeric res; bool is_null; res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0), false, false, &is_null); if (is_null) PG_RETURN_NULL(); else PG_RETURN_NUMERIC(res); } /* * SUM transition functions for integer datatypes. * * To avoid overflow, we use accumulators wider than the input datatype. * A Numeric accumulator is needed for int8 input; for int4 and int2 * inputs, we use int8 accumulators which should be sufficient for practical * purposes. (The latter two therefore don't really belong in this file, * but we keep them here anyway.) * * Because SQL92 defines the SUM() of no values to be NULL, not zero, * the initial condition of the transition data value needs to be NULL. This * means we can't rely on ExecAgg to automatically insert the first non-null * data value into the transition data: it doesn't know how to do the type * conversion. The upshot is that these routines have to be marked non-strict * and handle substitution of the first non-null input themselves. */ Datum int2_sum(PG_FUNCTION_ARGS) { int64 newval; if (PG_ARGISNULL(0)) { /* No non-null input seen so far... */ if (PG_ARGISNULL(1)) PG_RETURN_NULL(); /* still no non-null */ /* This is the first non-null input. */ newval = (int64) PG_GETARG_INT16(1); PG_RETURN_INT64(newval); } /* * If we're invoked by nodeAgg, we can cheat and modify our first * parameter in-place to avoid palloc overhead. If not, we need to return * the new value of the transition variable. (If int8 is pass-by-value, * then of course this is useless as well as incorrect, so just ifdef it * out.) */ #ifndef USE_FLOAT8_BYVAL /* controls int8 too */ if (fcinfo->context && (IsA(fcinfo->context, AggState) || IsA(fcinfo->context, WindowAggState))) { int64 *oldsum = (int64 *) PG_GETARG_POINTER(0); /* Leave the running sum unchanged in the new input is null */ if (!PG_ARGISNULL(1)) *oldsum = *oldsum + (int64) PG_GETARG_INT16(1); PG_RETURN_POINTER(oldsum); } else #endif { int64 oldsum = PG_GETARG_INT64(0); /* Leave sum unchanged if new input is null. */ if (PG_ARGISNULL(1)) PG_RETURN_INT64(oldsum); /* OK to do the addition. */ newval = oldsum + (int64) PG_GETARG_INT16(1); PG_RETURN_INT64(newval); } } Datum int4_sum(PG_FUNCTION_ARGS) { int64 newval; if (PG_ARGISNULL(0)) { /* No non-null input seen so far... */ if (PG_ARGISNULL(1)) PG_RETURN_NULL(); /* still no non-null */ /* This is the first non-null input. */ newval = (int64) PG_GETARG_INT32(1); PG_RETURN_INT64(newval); } /* * If we're invoked by nodeAgg, we can cheat and modify our first * parameter in-place to avoid palloc overhead. If not, we need to return * the new value of the transition variable. (If int8 is pass-by-value, * then of course this is useless as well as incorrect, so just ifdef it * out.) */ #ifndef USE_FLOAT8_BYVAL /* controls int8 too */ if (fcinfo->context && (IsA(fcinfo->context, AggState) || IsA(fcinfo->context, WindowAggState))) { int64 *oldsum = (int64 *) PG_GETARG_POINTER(0); /* Leave the running sum unchanged in the new input is null */ if (!PG_ARGISNULL(1)) *oldsum = *oldsum + (int64) PG_GETARG_INT32(1); PG_RETURN_POINTER(oldsum); } else #endif { int64 oldsum = PG_GETARG_INT64(0); /* Leave sum unchanged if new input is null. */ if (PG_ARGISNULL(1)) PG_RETURN_INT64(oldsum); /* OK to do the addition. */ newval = oldsum + (int64) PG_GETARG_INT32(1); PG_RETURN_INT64(newval); } } Datum int8_sum(PG_FUNCTION_ARGS) { Numeric oldsum; Datum newval; if (PG_ARGISNULL(0)) { /* No non-null input seen so far... */ if (PG_ARGISNULL(1)) PG_RETURN_NULL(); /* still no non-null */ /* This is the first non-null input. */ newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1)); PG_RETURN_DATUM(newval); } /* * Note that we cannot special-case the nodeAgg case here, as we do for * int2_sum and int4_sum: numeric is of variable size, so we cannot modify * our first parameter in-place. */ oldsum = PG_GETARG_NUMERIC(0); /* Leave sum unchanged if new input is null. */ if (PG_ARGISNULL(1)) PG_RETURN_NUMERIC(oldsum); /* OK to do the addition. */ newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1)); PG_RETURN_DATUM(DirectFunctionCall2(numeric_add, NumericGetDatum(oldsum), newval)); } /* * Routines for avg(int2) and avg(int4). The transition datatype * is a two-element int8 array, holding count and sum. */ typedef struct Int8TransTypeData { #ifndef INT64_IS_BUSTED int64 count; int64 sum; #else /* "int64" isn't really 64 bits, so fake up properly-aligned fields */ int32 count; int32 pad1; int32 sum; int32 pad2; #endif } Int8TransTypeData; Datum int2_avg_accum(PG_FUNCTION_ARGS) { ArrayType *transarray; int16 newval = PG_GETARG_INT16(1); Int8TransTypeData *transdata; /* * If we're invoked by nodeAgg, we can cheat and modify our first * parameter in-place to reduce palloc overhead. Otherwise we need to make * a copy of it before scribbling on it. */ if (fcinfo->context && (IsA(fcinfo->context, AggState) || IsA(fcinfo->context, WindowAggState))) transarray = PG_GETARG_ARRAYTYPE_P(0); else transarray = PG_GETARG_ARRAYTYPE_P_COPY(0); if (ARR_HASNULL(transarray) || ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData)) elog(ERROR, "expected 2-element int8 array"); transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray); transdata->count++; transdata->sum += newval; PG_RETURN_ARRAYTYPE_P(transarray); } Datum int4_avg_accum(PG_FUNCTION_ARGS) { ArrayType *transarray; int32 newval = PG_GETARG_INT32(1); Int8TransTypeData *transdata; /* * If we're invoked by nodeAgg, we can cheat and modify our first * parameter in-place to reduce palloc overhead. Otherwise we need to make * a copy of it before scribbling on it. */ if (fcinfo->context && (IsA(fcinfo->context, AggState) || IsA(fcinfo->context, WindowAggState))) transarray = PG_GETARG_ARRAYTYPE_P(0); else transarray = PG_GETARG_ARRAYTYPE_P_COPY(0); if (ARR_HASNULL(transarray) || ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData)) elog(ERROR, "expected 2-element int8 array"); transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray); transdata->count++; transdata->sum += newval; PG_RETURN_ARRAYTYPE_P(transarray); } Datum int8_avg(PG_FUNCTION_ARGS) { ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(0); Int8TransTypeData *transdata; Datum countd, sumd; if (ARR_HASNULL(transarray) || ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData)) elog(ERROR, "expected 2-element int8 array"); transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray); /* SQL92 defines AVG of no values to be NULL */ if (transdata->count == 0) PG_RETURN_NULL(); countd = DirectFunctionCall1(int8_numeric, Int64GetDatumFast(transdata->count)); sumd = DirectFunctionCall1(int8_numeric, Int64GetDatumFast(transdata->sum)); PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd)); } /* ---------------------------------------------------------------------- * * Debug support * * ---------------------------------------------------------------------- */ #ifdef NUMERIC_DEBUG /* * dump_numeric() - Dump a value in the db storage format for debugging */ static void dump_numeric(const char *str, Numeric num) { NumericDigit *digits = NUMERIC_DIGITS(num); int ndigits; int i; ndigits = NUMERIC_NDIGITS(num); printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num)); switch (NUMERIC_SIGN(num)) { case NUMERIC_POS: printf("POS"); break; case NUMERIC_NEG: printf("NEG"); break; case NUMERIC_NAN: printf("NaN"); break; default: printf("SIGN=0x%x", NUMERIC_SIGN(num)); break; } for (i = 0; i < ndigits; i++) printf(" %0*d", DEC_DIGITS, digits[i]); printf("\n"); } /* * dump_var() - Dump a value in the variable format for debugging */ static void dump_var(const char *str, NumericVar *var) { int i; printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale); switch (var->sign) { case NUMERIC_POS: printf("POS"); break; case NUMERIC_NEG: printf("NEG"); break; case NUMERIC_NAN: printf("NaN"); break; default: printf("SIGN=0x%x", var->sign); break; } for (i = 0; i < var->ndigits; i++) printf(" %0*d", DEC_DIGITS, var->digits[i]); printf("\n"); } #endif /* NUMERIC_DEBUG */ /* ---------------------------------------------------------------------- * * Local functions follow * * In general, these do not support NaNs --- callers must eliminate * the possibility of NaN first. (make_result() is an exception.) * * ---------------------------------------------------------------------- */ /* * alloc_var() - * * Allocate a digit buffer of ndigits digits (plus a spare digit for rounding) */ static void alloc_var(NumericVar *var, int ndigits) { digitbuf_free(var->buf); var->buf = digitbuf_alloc(ndigits + 1); var->buf[0] = 0; /* spare digit for rounding */ var->digits = var->buf + 1; var->ndigits = ndigits; } /* * free_var() - * * Return the digit buffer of a variable to the free pool */ static void free_var(NumericVar *var) { digitbuf_free(var->buf); var->buf = NULL; var->digits = NULL; var->sign = NUMERIC_NAN; } /* * zero_var() - * * Set a variable to ZERO. * Note: its dscale is not touched. */ static void zero_var(NumericVar *var) { digitbuf_free(var->buf); var->buf = NULL; var->digits = NULL; var->ndigits = 0; var->weight = 0; /* by convention; doesn't really matter */ var->sign = NUMERIC_POS; /* anything but NAN... */ } /* * set_var_from_str() * * Parse a string and put the number into a variable * * This function does not handle leading or trailing spaces, and it doesn't * accept "NaN" either. It returns the end+1 position so that caller can * check for trailing spaces/garbage if deemed necessary. * * cp is the place to actually start parsing; str is what to use in error * reports. (Typically cp would be the same except advanced over spaces.) */ static const char * set_var_from_str(const char *str, const char *cp, NumericVar *dest) { bool have_dp = FALSE; int i; unsigned char *decdigits; int sign = NUMERIC_POS; int dweight = -1; int ddigits; int dscale = 0; int weight; int ndigits; int offset; NumericDigit *digits; /* * We first parse the string to extract decimal digits and determine the * correct decimal weight. Then convert to NBASE representation. */ switch (*cp) { case '+': sign = NUMERIC_POS; cp++; break; case '-': sign = NUMERIC_NEG; cp++; break; } if (*cp == '.') { have_dp = TRUE; cp++; } if (!isdigit((unsigned char) *cp)) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2); /* leading padding for digit alignment later */ memset(decdigits, 0, DEC_DIGITS); i = DEC_DIGITS; while (*cp) { if (isdigit((unsigned char) *cp)) { decdigits[i++] = *cp++ - '0'; if (!have_dp) dweight++; else dscale++; } else if (*cp == '.') { if (have_dp) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); have_dp = TRUE; cp++; } else break; } ddigits = i - DEC_DIGITS; /* trailing padding for digit alignment later */ memset(decdigits + i, 0, DEC_DIGITS - 1); /* Handle exponent, if any */ if (*cp == 'e' || *cp == 'E') { long exponent; char *endptr; cp++; exponent = strtol(cp, &endptr, 10); if (endptr == cp) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); cp = endptr; if (exponent > NUMERIC_MAX_PRECISION || exponent < -NUMERIC_MAX_PRECISION) ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type numeric: \"%s\"", str))); dweight += (int) exponent; dscale -= (int) exponent; if (dscale < 0) dscale = 0; } /* * Okay, convert pure-decimal representation to base NBASE. First we need * to determine the converted weight and ndigits. offset is the number of * decimal zeroes to insert before the first given digit to have a * correctly aligned first NBASE digit. */ if (dweight >= 0) weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1; else weight = -((-dweight - 1) / DEC_DIGITS + 1); offset = (weight + 1) * DEC_DIGITS - (dweight + 1); ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS; alloc_var(dest, ndigits); dest->sign = sign; dest->weight = weight; dest->dscale = dscale; i = DEC_DIGITS - offset; digits = dest->digits; while (ndigits-- > 0) { #if DEC_DIGITS == 4 *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 + decdigits[i + 2]) * 10 + decdigits[i + 3]; #elif DEC_DIGITS == 2 *digits++ = decdigits[i] * 10 + decdigits[i + 1]; #elif DEC_DIGITS == 1 *digits++ = decdigits[i]; #else #error unsupported NBASE #endif i += DEC_DIGITS; } pfree(decdigits); /* Strip any leading/trailing zeroes, and normalize weight if zero */ strip_var(dest); /* Return end+1 position for caller */ return cp; } /* * set_var_from_num() - * * Convert the packed db format into a variable */ static void set_var_from_num(Numeric num, NumericVar *dest) { int ndigits; ndigits = NUMERIC_NDIGITS(num); alloc_var(dest, ndigits); dest->weight = num->n_weight; dest->sign = NUMERIC_SIGN(num); dest->dscale = NUMERIC_DSCALE(num); memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit)); } /* * set_var_from_var() - * * Copy one variable into another */ static void set_var_from_var(NumericVar *value, NumericVar *dest) { NumericDigit *newbuf; newbuf = digitbuf_alloc(value->ndigits + 1); newbuf[0] = 0; /* spare digit for rounding */ memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit)); digitbuf_free(dest->buf); memmove(dest, value, sizeof(NumericVar)); dest->buf = newbuf; dest->digits = newbuf + 1; } /* * get_str_from_var() - * * Convert a var to text representation (guts of numeric_out). * CAUTION: var's contents may be modified by rounding! * Returns a palloc'd string. */ static char * get_str_from_var(NumericVar *var, int dscale) { char *str; char *cp; char *endcp; int i; int d; NumericDigit dig; #if DEC_DIGITS > 1 NumericDigit d1; #endif if (dscale < 0) dscale = 0; /* * Check if we must round up before printing the value and do so. */ round_var(var, dscale); /* * Allocate space for the result. * * i is set to to # of decimal digits before decimal point. dscale is the * # of decimal digits we will print after decimal point. We may generate * as many as DEC_DIGITS-1 excess digits at the end, and in addition we * need room for sign, decimal point, null terminator. */ i = (var->weight + 1) * DEC_DIGITS; if (i <= 0) i = 1; str = palloc(i + dscale + DEC_DIGITS + 2); cp = str; /* * Output a dash for negative values */ if (var->sign == NUMERIC_NEG) *cp++ = '-'; /* * Output all digits before the decimal point */ if (var->weight < 0) { d = var->weight + 1; *cp++ = '0'; } else { for (d = 0; d <= var->weight; d++) { dig = (d < var->ndigits) ? var->digits[d] : 0; /* In the first digit, suppress extra leading decimal zeroes */ #if DEC_DIGITS == 4 { bool putit = (d > 0); d1 = dig / 1000; dig -= d1 * 1000; putit |= (d1 > 0); if (putit) *cp++ = d1 + '0'; d1 = dig / 100; dig -= d1 * 100; putit |= (d1 > 0); if (putit) *cp++ = d1 + '0'; d1 = dig / 10; dig -= d1 * 10; putit |= (d1 > 0); if (putit) *cp++ = d1 + '0'; *cp++ = dig + '0'; } #elif DEC_DIGITS == 2 d1 = dig / 10; dig -= d1 * 10; if (d1 > 0 || d > 0) *cp++ = d1 + '0'; *cp++ = dig + '0'; #elif DEC_DIGITS == 1 *cp++ = dig + '0'; #else #error unsupported NBASE #endif } } /* * If requested, output a decimal point and all the digits that follow it. * We initially put out a multiple of DEC_DIGITS digits, then truncate if * needed. */ if (dscale > 0) { *cp++ = '.'; endcp = cp + dscale; for (i = 0; i < dscale; d++, i += DEC_DIGITS) { dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0; #if DEC_DIGITS == 4 d1 = dig / 1000; dig -= d1 * 1000; *cp++ = d1 + '0'; d1 = dig / 100; dig -= d1 * 100; *cp++ = d1 + '0'; d1 = dig / 10; dig -= d1 * 10; *cp++ = d1 + '0'; *cp++ = dig + '0'; #elif DEC_DIGITS == 2 d1 = dig / 10; dig -= d1 * 10; *cp++ = d1 + '0'; *cp++ = dig + '0'; #elif DEC_DIGITS == 1 *cp++ = dig + '0'; #else #error unsupported NBASE #endif } cp = endcp; } /* * terminate the string and return it */ *cp = '\0'; return str; } /* * make_result() - * * Create the packed db numeric format in palloc()'d memory from * a variable. */ static Numeric make_result(NumericVar *var) { Numeric result; NumericDigit *digits = var->digits; int weight = var->weight; int sign = var->sign; int n; Size len; if (sign == NUMERIC_NAN) { result = (Numeric) palloc(NUMERIC_HDRSZ); SET_VARSIZE(result, NUMERIC_HDRSZ); result->n_weight = 0; result->n_sign_dscale = NUMERIC_NAN; dump_numeric("make_result()", result); return result; } n = var->ndigits; /* truncate leading zeroes */ while (n > 0 && *digits == 0) { digits++; weight--; n--; } /* truncate trailing zeroes */ while (n > 0 && digits[n - 1] == 0) n--; /* If zero result, force to weight=0 and positive sign */ if (n == 0) { weight = 0; sign = NUMERIC_POS; } /* Build the result */ len = NUMERIC_HDRSZ + n * sizeof(NumericDigit); result = (Numeric) palloc(len); SET_VARSIZE(result, len); result->n_weight = weight; result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK); memcpy(result->n_data, digits, n * sizeof(NumericDigit)); /* Check for overflow of int16 fields */ if (result->n_weight != weight || NUMERIC_DSCALE(result) != var->dscale) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("value overflows numeric format"))); dump_numeric("make_result()", result); return result; } /* * apply_typmod() - * * Do bounds checking and rounding according to the attributes * typmod field. */ static void apply_typmod(NumericVar *var, int32 typmod) { int precision; int scale; int maxdigits; int ddigits; int i; /* Do nothing if we have a default typmod (-1) */ if (typmod < (int32) (VARHDRSZ)) return; typmod -= VARHDRSZ; precision = (typmod >> 16) & 0xffff; scale = typmod & 0xffff; maxdigits = precision - scale; /* Round to target scale (and set var->dscale) */ round_var(var, scale); /* * Check for overflow - note we can't do this before rounding, because * rounding could raise the weight. Also note that the var's weight could * be inflated by leading zeroes, which will be stripped before storage * but perhaps might not have been yet. In any case, we must recognize a * true zero, whose weight doesn't mean anything. */ ddigits = (var->weight + 1) * DEC_DIGITS; if (ddigits > maxdigits) { /* Determine true weight; and check for all-zero result */ for (i = 0; i < var->ndigits; i++) { NumericDigit dig = var->digits[i]; if (dig) { /* Adjust for any high-order decimal zero digits */ #if DEC_DIGITS == 4 if (dig < 10) ddigits -= 3; else if (dig < 100) ddigits -= 2; else if (dig < 1000) ddigits -= 1; #elif DEC_DIGITS == 2 if (dig < 10) ddigits -= 1; #elif DEC_DIGITS == 1 /* no adjustment */ #else #error unsupported NBASE #endif if (ddigits > maxdigits) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("numeric field overflow"), errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.", precision, scale, /* Display 10^0 as 1 */ maxdigits ? "10^" : "", maxdigits ? maxdigits : 1 ))); break; } ddigits -= DEC_DIGITS; } } } /* * Convert numeric to int8, rounding if needed. * * If overflow, return FALSE (no error is raised). Return TRUE if okay. * * CAUTION: var's contents may be modified by rounding! */ static bool numericvar_to_int8(NumericVar *var, int64 *result) { NumericDigit *digits; int ndigits; int weight; int i; int64 val, oldval; bool neg; /* Round to nearest integer */ round_var(var, 0); /* Check for zero input */ strip_var(var); ndigits = var->ndigits; if (ndigits == 0) { *result = 0; return true; } /* * For input like 10000000000, we must treat stripped digits as real. So * the loop assumes there are weight+1 digits before the decimal point. */ weight = var->weight; Assert(weight >= 0 && ndigits <= weight + 1); /* Construct the result */ digits = var->digits; neg = (var->sign == NUMERIC_NEG); val = digits[0]; for (i = 1; i <= weight; i++) { oldval = val; val *= NBASE; if (i < ndigits) val += digits[i]; /* * The overflow check is a bit tricky because we want to accept * INT64_MIN, which will overflow the positive accumulator. We can * detect this case easily though because INT64_MIN is the only * nonzero value for which -val == val (on a two's complement machine, * anyway). */ if ((val / NBASE) != oldval) /* possible overflow? */ { if (!neg || (-val) != val || val == 0 || oldval < 0) return false; } } *result = neg ? -val : val; return true; } /* * Convert int8 value to numeric. */ static void int8_to_numericvar(int64 val, NumericVar *var) { uint64 uval, newuval; NumericDigit *ptr; int ndigits; /* int8 can require at most 19 decimal digits; add one for safety */ alloc_var(var, 20 / DEC_DIGITS); if (val < 0) { var->sign = NUMERIC_NEG; uval = -val; } else { var->sign = NUMERIC_POS; uval = val; } var->dscale = 0; if (val == 0) { var->ndigits = 0; var->weight = 0; return; } ptr = var->digits + var->ndigits; ndigits = 0; do { ptr--; ndigits++; newuval = uval / NBASE; *ptr = uval - newuval * NBASE; uval = newuval; } while (uval); var->digits = ptr; var->ndigits = ndigits; var->weight = ndigits - 1; } /* * Convert numeric to float8; if out of range, return +/- HUGE_VAL */ static double numeric_to_double_no_overflow(Numeric num) { char *tmp; double val; char *endptr; tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num))); /* unlike float8in, we ignore ERANGE from strtod */ val = strtod(tmp, &endptr); if (*endptr != '\0') { /* shouldn't happen ... */ ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type double precision: \"%s\"", tmp))); } pfree(tmp); return val; } /* As above, but work from a NumericVar */ static double numericvar_to_double_no_overflow(NumericVar *var) { char *tmp; double val; char *endptr; tmp = get_str_from_var(var, var->dscale); /* unlike float8in, we ignore ERANGE from strtod */ val = strtod(tmp, &endptr); if (*endptr != '\0') { /* shouldn't happen ... */ ereport(ERROR, (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid input syntax for type double precision: \"%s\"", tmp))); } pfree(tmp); return val; } /* * cmp_var() - * * Compare two values on variable level. We assume zeroes have been * truncated to no digits. */ static int cmp_var(NumericVar *var1, NumericVar *var2) { return cmp_var_common(var1->digits, var1->ndigits, var1->weight, var1->sign, var2->digits, var2->ndigits, var2->weight, var2->sign); } /* * cmp_var_common() - * * Main routine of cmp_var(). This function can be used by both * NumericVar and Numeric. */ static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits, int var1weight, int var1sign, const NumericDigit *var2digits, int var2ndigits, int var2weight, int var2sign) { if (var1ndigits == 0) { if (var2ndigits == 0) return 0; if (var2sign == NUMERIC_NEG) return 1; return -1; } if (var2ndigits == 0) { if (var1sign == NUMERIC_POS) return 1; return -1; } if (var1sign == NUMERIC_POS) { if (var2sign == NUMERIC_NEG) return 1; return cmp_abs_common(var1digits, var1ndigits, var1weight, var2digits, var2ndigits, var2weight); } if (var2sign == NUMERIC_POS) return -1; return cmp_abs_common(var2digits, var2ndigits, var2weight, var1digits, var1ndigits, var1weight); } /* * add_var() - * * Full version of add functionality on variable level (handling signs). * result might point to one of the operands too without danger. */ static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result) { /* * Decide on the signs of the two variables what to do */ if (var1->sign == NUMERIC_POS) { if (var2->sign == NUMERIC_POS) { /* * Both are positive result = +(ABS(var1) + ABS(var2)) */ add_abs(var1, var2, result); result->sign = NUMERIC_POS; } else { /* * var1 is positive, var2 is negative Must compare absolute values */ switch (cmp_abs(var1, var2)) { case 0: /* ---------- * ABS(var1) == ABS(var2) * result = ZERO * ---------- */ zero_var(result); result->dscale = Max(var1->dscale, var2->dscale); break; case 1: /* ---------- * ABS(var1) > ABS(var2) * result = +(ABS(var1) - ABS(var2)) * ---------- */ sub_abs(var1, var2, result); result->sign = NUMERIC_POS; break; case -1: /* ---------- * ABS(var1) < ABS(var2) * result = -(ABS(var2) - ABS(var1)) * ---------- */ sub_abs(var2, var1, result); result->sign = NUMERIC_NEG; break; } } } else { if (var2->sign == NUMERIC_POS) { /* ---------- * var1 is negative, var2 is positive * Must compare absolute values * ---------- */ switch (cmp_abs(var1, var2)) { case 0: /* ---------- * ABS(var1) == ABS(var2) * result = ZERO * ---------- */ zero_var(result); result->dscale = Max(var1->dscale, var2->dscale); break; case 1: /* ---------- * ABS(var1) > ABS(var2) * result = -(ABS(var1) - ABS(var2)) * ---------- */ sub_abs(var1, var2, result); result->sign = NUMERIC_NEG; break; case -1: /* ---------- * ABS(var1) < ABS(var2) * result = +(ABS(var2) - ABS(var1)) * ---------- */ sub_abs(var2, var1, result); result->sign = NUMERIC_POS; break; } } else { /* ---------- * Both are negative * result = -(ABS(var1) + ABS(var2)) * ---------- */ add_abs(var1, var2, result); result->sign = NUMERIC_NEG; } } } /* * sub_var() - * * Full version of sub functionality on variable level (handling signs). * result might point to one of the operands too without danger. */ static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result) { /* * Decide on the signs of the two variables what to do */ if (var1->sign == NUMERIC_POS) { if (var2->sign == NUMERIC_NEG) { /* ---------- * var1 is positive, var2 is negative * result = +(ABS(var1) + ABS(var2)) * ---------- */ add_abs(var1, var2, result); result->sign = NUMERIC_POS; } else { /* ---------- * Both are positive * Must compare absolute values * ---------- */ switch (cmp_abs(var1, var2)) { case 0: /* ---------- * ABS(var1) == ABS(var2) * result = ZERO * ---------- */ zero_var(result); result->dscale = Max(var1->dscale, var2->dscale); break; case 1: /* ---------- * ABS(var1) > ABS(var2) * result = +(ABS(var1) - ABS(var2)) * ---------- */ sub_abs(var1, var2, result); result->sign = NUMERIC_POS; break; case -1: /* ---------- * ABS(var1) < ABS(var2) * result = -(ABS(var2) - ABS(var1)) * ---------- */ sub_abs(var2, var1, result); result->sign = NUMERIC_NEG; break; } } } else { if (var2->sign == NUMERIC_NEG) { /* ---------- * Both are negative * Must compare absolute values * ---------- */ switch (cmp_abs(var1, var2)) { case 0: /* ---------- * ABS(var1) == ABS(var2) * result = ZERO * ---------- */ zero_var(result); result->dscale = Max(var1->dscale, var2->dscale); break; case 1: /* ---------- * ABS(var1) > ABS(var2) * result = -(ABS(var1) - ABS(var2)) * ---------- */ sub_abs(var1, var2, result); result->sign = NUMERIC_NEG; break; case -1: /* ---------- * ABS(var1) < ABS(var2) * result = +(ABS(var2) - ABS(var1)) * ---------- */ sub_abs(var2, var1, result); result->sign = NUMERIC_POS; break; } } else { /* ---------- * var1 is negative, var2 is positive * result = -(ABS(var1) + ABS(var2)) * ---------- */ add_abs(var1, var2, result); result->sign = NUMERIC_NEG; } } } /* * mul_var() - * * Multiplication on variable level. Product of var1 * var2 is stored * in result. Result is rounded to no more than rscale fractional digits. */ static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale) { int res_ndigits; int res_sign; int res_weight; int maxdigits; int *dig; int carry; int maxdig; int newdig; NumericDigit *res_digits; int i, ri, i1, i2; /* copy these values into local vars for speed in inner loop */ int var1ndigits = var1->ndigits; int var2ndigits = var2->ndigits; NumericDigit *var1digits = var1->digits; NumericDigit *var2digits = var2->digits; if (var1ndigits == 0 || var2ndigits == 0) { /* one or both inputs is zero; so is result */ zero_var(result); result->dscale = rscale; return; } /* Determine result sign and (maximum possible) weight */ if (var1->sign == var2->sign) res_sign = NUMERIC_POS; else res_sign = NUMERIC_NEG; res_weight = var1->weight + var2->weight + 2; /* * Determine number of result digits to compute. If the exact result * would have more than rscale fractional digits, truncate the computation * with MUL_GUARD_DIGITS guard digits. We do that by pretending that one * or both inputs have fewer digits than they really do. */ res_ndigits = var1ndigits + var2ndigits + 1; maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS; if (res_ndigits > maxdigits) { if (maxdigits < 3) { /* no useful precision at all in the result... */ zero_var(result); result->dscale = rscale; return; } /* force maxdigits odd so that input ndigits can be equal */ if ((maxdigits & 1) == 0) maxdigits++; if (var1ndigits > var2ndigits) { var1ndigits -= res_ndigits - maxdigits; if (var1ndigits < var2ndigits) var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2; } else { var2ndigits -= res_ndigits - maxdigits; if (var2ndigits < var1ndigits) var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2; } res_ndigits = maxdigits; Assert(res_ndigits == var1ndigits + var2ndigits + 1); } /* * We do the arithmetic in an array "dig[]" of signed int's. Since * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom * to avoid normalizing carries immediately. * * maxdig tracks the maximum possible value of any dig[] entry; when this * threatens to exceed INT_MAX, we take the time to propagate carries. To * avoid overflow in maxdig itself, it actually represents the max * possible value divided by NBASE-1. */ dig = (int *) palloc0(res_ndigits * sizeof(int)); maxdig = 0; ri = res_ndigits - 1; for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--) { int var1digit = var1digits[i1]; if (var1digit == 0) continue; /* Time to normalize? */ maxdig += var1digit; if (maxdig > INT_MAX / (NBASE - 1)) { /* Yes, do it */ carry = 0; for (i = res_ndigits - 1; i >= 0; i--) { newdig = dig[i] + carry; if (newdig >= NBASE) { carry = newdig / NBASE; newdig -= carry * NBASE; } else carry = 0; dig[i] = newdig; } Assert(carry == 0); /* Reset maxdig to indicate new worst-case */ maxdig = 1 + var1digit; } /* Add appropriate multiple of var2 into the accumulator */ i = ri; for (i2 = var2ndigits - 1; i2 >= 0; i2--) dig[i--] += var1digit * var2digits[i2]; } /* * Now we do a final carry propagation pass to normalize the result, which * we combine with storing the result digits into the output. Note that * this is still done at full precision w/guard digits. */ alloc_var(result, res_ndigits); res_digits = result->digits; carry = 0; for (i = res_ndigits - 1; i >= 0; i--) { newdig = dig[i] + carry; if (newdig >= NBASE) { carry = newdig / NBASE; newdig -= carry * NBASE; } else carry = 0; res_digits[i] = newdig; } Assert(carry == 0); pfree(dig); /* * Finally, round the result to the requested precision. */ result->weight = res_weight; result->sign = res_sign; /* Round to target rscale (and set result->dscale) */ round_var(result, rscale); /* Strip leading and trailing zeroes */ strip_var(result); } /* * div_var() - * * Division on variable level. Quotient of var1 / var2 is stored in result. * The quotient is figured to exactly rscale fractional digits. * If round is true, it is rounded at the rscale'th digit; if false, it * is truncated (towards zero) at that digit. */ static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale, bool round) { int div_ndigits; int res_ndigits; int res_sign; int res_weight; int carry; int borrow; int divisor1; int divisor2; NumericDigit *dividend; NumericDigit *divisor; NumericDigit *res_digits; int i; int j; /* copy these values into local vars for speed in inner loop */ int var1ndigits = var1->ndigits; int var2ndigits = var2->ndigits; /* * First of all division by zero check; we must not be handed an * unnormalized divisor. */ if (var2ndigits == 0 || var2->digits[0] == 0) ereport(ERROR, (errcode(ERRCODE_DIVISION_BY_ZERO), errmsg("division by zero"))); /* * Now result zero check */ if (var1ndigits == 0) { zero_var(result); result->dscale = rscale; return; } /* * Determine the result sign, weight and number of digits to calculate. * The weight figured here is correct if the emitted quotient has no * leading zero digits; otherwise strip_var() will fix things up. */ if (var1->sign == var2->sign) res_sign = NUMERIC_POS; else res_sign = NUMERIC_NEG; res_weight = var1->weight - var2->weight; /* The number of accurate result digits we need to produce: */ res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS; /* ... but always at least 1 */ res_ndigits = Max(res_ndigits, 1); /* If rounding needed, figure one more digit to ensure correct result */ if (round) res_ndigits++; /* * The working dividend normally requires res_ndigits + var2ndigits * digits, but make it at least var1ndigits so we can load all of var1 * into it. (There will be an additional digit dividend[0] in the * dividend space, but for consistency with Knuth's notation we don't * count that in div_ndigits.) */ div_ndigits = res_ndigits + var2ndigits; div_ndigits = Max(div_ndigits, var1ndigits); /* * We need a workspace with room for the working dividend (div_ndigits+1 * digits) plus room for the possibly-normalized divisor (var2ndigits * digits). It is convenient also to have a zero at divisor[0] with the * actual divisor data in divisor[1 .. var2ndigits]. Transferring the * digits into the workspace also allows us to realloc the result (which * might be the same as either input var) before we begin the main loop. * Note that we use palloc0 to ensure that divisor[0], dividend[0], and * any additional dividend positions beyond var1ndigits, start out 0. */ dividend = (NumericDigit *) palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit)); divisor = dividend + (div_ndigits + 1); memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit)); memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit)); /* * Now we can realloc the result to hold the generated quotient digits. */ alloc_var(result, res_ndigits); res_digits = result->digits; if (var2ndigits == 1) { /* * If there's only a single divisor digit, we can use a fast path (cf. * Knuth section 4.3.1 exercise 16). */ divisor1 = divisor[1]; carry = 0; for (i = 0; i < res_ndigits; i++) { carry = carry * NBASE + dividend[i + 1]; res_digits[i] = carry / divisor1; carry = carry % divisor1; } } else { /* * The full multiple-place algorithm is taken from Knuth volume 2, * Algorithm 4.3.1D. * * We need the first divisor digit to be >= NBASE/2. If it isn't, * make it so by scaling up both the divisor and dividend by the * factor "d". (The reason for allocating dividend[0] above is to * leave room for possible carry here.) */ if (divisor[1] < HALF_NBASE) { int d = NBASE / (divisor[1] + 1); carry = 0; for (i = var2ndigits; i > 0; i--) { carry += divisor[i] * d; divisor[i] = carry % NBASE; carry = carry / NBASE; } Assert(carry == 0); carry = 0; /* at this point only var1ndigits of dividend can be nonzero */ for (i = var1ndigits; i >= 0; i--) { carry += dividend[i] * d; dividend[i] = carry % NBASE; carry = carry / NBASE; } Assert(carry == 0); Assert(divisor[1] >= HALF_NBASE); } /* First 2 divisor digits are used repeatedly in main loop */ divisor1 = divisor[1]; divisor2 = divisor[2]; /* * Begin the main loop. Each iteration of this loop produces the j'th * quotient digit by dividing dividend[j .. j + var2ndigits] by the * divisor; this is essentially the same as the common manual * procedure for long division. */ for (j = 0; j < res_ndigits; j++) { /* Estimate quotient digit from the first two dividend digits */ int next2digits = dividend[j] * NBASE + dividend[j + 1]; int qhat; /* * If next2digits are 0, then quotient digit must be 0 and there's * no need to adjust the working dividend. It's worth testing * here to fall out ASAP when processing trailing zeroes in a * dividend. */ if (next2digits == 0) { res_digits[j] = 0; continue; } if (dividend[j] == divisor1) qhat = NBASE - 1; else qhat = next2digits / divisor1; /* * Adjust quotient digit if it's too large. Knuth proves that * after this step, the quotient digit will be either correct or * just one too large. (Note: it's OK to use dividend[j+2] here * because we know the divisor length is at least 2.) */ while (divisor2 * qhat > (next2digits - qhat * divisor1) * NBASE + dividend[j + 2]) qhat--; /* As above, need do nothing more when quotient digit is 0 */ if (qhat > 0) { /* * Multiply the divisor by qhat, and subtract that from the * working dividend. "carry" tracks the multiplication, * "borrow" the subtraction (could we fold these together?) */ carry = 0; borrow = 0; for (i = var2ndigits; i >= 0; i--) { carry += divisor[i] * qhat; borrow -= carry % NBASE; carry = carry / NBASE; borrow += dividend[j + i]; if (borrow < 0) { dividend[j + i] = borrow + NBASE; borrow = -1; } else { dividend[j + i] = borrow; borrow = 0; } } Assert(carry == 0); /* * If we got a borrow out of the top dividend digit, then * indeed qhat was one too large. Fix it, and add back the * divisor to correct the working dividend. (Knuth proves * that this will occur only about 3/NBASE of the time; hence, * it's a good idea to test this code with small NBASE to be * sure this section gets exercised.) */ if (borrow) { qhat--; carry = 0; for (i = var2ndigits; i >= 0; i--) { carry += dividend[j + i] + divisor[i]; if (carry >= NBASE) { dividend[j + i] = carry - NBASE; carry = 1; } else { dividend[j + i] = carry; carry = 0; } } /* A carry should occur here to cancel the borrow above */ Assert(carry == 1); } } /* And we're done with this quotient digit */ res_digits[j] = qhat; } } pfree(dividend); /* * Finally, round or truncate the result to the requested precision. */ result->weight = res_weight; result->sign = res_sign; /* Round or truncate to target rscale (and set result->dscale) */ if (round) round_var(result, rscale); else trunc_var(result, rscale); /* Strip leading and trailing zeroes */ strip_var(result); } /* * div_var_fast() - * * This has the same API as div_var, but is implemented using the division * algorithm from the "FM" library, rather than Knuth's schoolbook-division * approach. This is significantly faster but can produce inaccurate * results, because it sometimes has to propagate rounding to the left, * and so we can never be entirely sure that we know the requested digits * exactly. We compute DIV_GUARD_DIGITS extra digits, but there is * no certainty that that's enough. We use this only in the transcendental * function calculation routines, where everything is approximate anyway. */ static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result, int rscale, bool round) { int div_ndigits; int res_sign; int res_weight; int *div; int qdigit; int carry; int maxdiv; int newdig; NumericDigit *res_digits; double fdividend, fdivisor, fdivisorinverse, fquotient; int qi; int i; /* copy these values into local vars for speed in inner loop */ int var1ndigits = var1->ndigits; int var2ndigits = var2->ndigits; NumericDigit *var1digits = var1->digits; NumericDigit *var2digits = var2->digits; /* * First of all division by zero check; we must not be handed an * unnormalized divisor. */ if (var2ndigits == 0 || var2digits[0] == 0) ereport(ERROR, (errcode(ERRCODE_DIVISION_BY_ZERO), errmsg("division by zero"))); /* * Now result zero check */ if (var1ndigits == 0) { zero_var(result); result->dscale = rscale; return; } /* * Determine the result sign, weight and number of digits to calculate */ if (var1->sign == var2->sign) res_sign = NUMERIC_POS; else res_sign = NUMERIC_NEG; res_weight = var1->weight - var2->weight + 1; /* The number of accurate result digits we need to produce: */ div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS; /* Add guard digits for roundoff error */ div_ndigits += DIV_GUARD_DIGITS; if (div_ndigits < DIV_GUARD_DIGITS) div_ndigits = DIV_GUARD_DIGITS; /* Must be at least var1ndigits, too, to simplify data-loading loop */ if (div_ndigits < var1ndigits) div_ndigits = var1ndigits; /* * We do the arithmetic in an array "div[]" of signed int's. Since * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom * to avoid normalizing carries immediately. * * We start with div[] containing one zero digit followed by the * dividend's digits (plus appended zeroes to reach the desired precision * including guard digits). Each step of the main loop computes an * (approximate) quotient digit and stores it into div[], removing one * position of dividend space. A final pass of carry propagation takes * care of any mistaken quotient digits. */ div = (int *) palloc0((div_ndigits + 1) * sizeof(int)); for (i = 0; i < var1ndigits; i++) div[i + 1] = var1digits[i]; /* * We estimate each quotient digit using floating-point arithmetic, taking * the first four digits of the (current) dividend and divisor. This must * be float to avoid overflow. */ fdivisor = (double) var2digits[0]; for (i = 1; i < 4; i++) { fdivisor *= NBASE; if (i < var2ndigits) fdivisor += (double) var2digits[i]; } fdivisorinverse = 1.0 / fdivisor; /* * maxdiv tracks the maximum possible absolute value of any div[] entry; * when this threatens to exceed INT_MAX, we take the time to propagate * carries. To avoid overflow in maxdiv itself, it actually represents * the max possible abs. value divided by NBASE-1. */ maxdiv = 1; /* * Outer loop computes next quotient digit, which will go into div[qi] */ for (qi = 0; qi < div_ndigits; qi++) { /* Approximate the current dividend value */ fdividend = (double) div[qi]; for (i = 1; i < 4; i++) { fdividend *= NBASE; if (qi + i <= div_ndigits) fdividend += (double) div[qi + i]; } /* Compute the (approximate) quotient digit */ fquotient = fdividend * fdivisorinverse; qdigit = (fquotient >= 0.0) ? ((int) fquotient) : (((int) fquotient) - 1); /* truncate towards -infinity */ if (qdigit != 0) { /* Do we need to normalize now? */ maxdiv += Abs(qdigit); if (maxdiv > INT_MAX / (NBASE - 1)) { /* Yes, do it */ carry = 0; for (i = div_ndigits; i > qi; i--) { newdig = div[i] + carry; if (newdig < 0) { carry = -((-newdig - 1) / NBASE) - 1; newdig -= carry * NBASE; } else if (newdig >= NBASE) { carry = newdig / NBASE; newdig -= carry * NBASE; } else carry = 0; div[i] = newdig; } newdig = div[qi] + carry; div[qi] = newdig; /* * All the div[] digits except possibly div[qi] are now in the * range 0..NBASE-1. */ maxdiv = Abs(newdig) / (NBASE - 1); maxdiv = Max(maxdiv, 1); /* * Recompute the quotient digit since new info may have * propagated into the top four dividend digits */ fdividend = (double) div[qi]; for (i = 1; i < 4; i++) { fdividend *= NBASE; if (qi + i <= div_ndigits) fdividend += (double) div[qi + i]; } /* Compute the (approximate) quotient digit */ fquotient = fdividend * fdivisorinverse; qdigit = (fquotient >= 0.0) ? ((int) fquotient) : (((int) fquotient) - 1); /* truncate towards -infinity */ maxdiv += Abs(qdigit); } /* Subtract off the appropriate multiple of the divisor */ if (qdigit != 0) { int istop = Min(var2ndigits, div_ndigits - qi + 1); for (i = 0; i < istop; i++) div[qi + i] -= qdigit * var2digits[i]; } } /* * The dividend digit we are about to replace might still be nonzero. * Fold it into the next digit position. We don't need to worry about * overflow here since this should nearly cancel with the subtraction * of the divisor. */ div[qi + 1] += div[qi] * NBASE; div[qi] = qdigit; } /* * Approximate and store the last quotient digit (div[div_ndigits]) */ fdividend = (double) div[qi]; for (i = 1; i < 4; i++) fdividend *= NBASE; fquotient = fdividend * fdivisorinverse; qdigit = (fquotient >= 0.0) ? ((int) fquotient) : (((int) fquotient) - 1); /* truncate towards -infinity */ div[qi] = qdigit; /* * Now we do a final carry propagation pass to normalize the result, which * we combine with storing the result digits into the output. Note that * this is still done at full precision w/guard digits. */ alloc_var(result, div_ndigits + 1); res_digits = result->digits; carry = 0; for (i = div_ndigits; i >= 0; i--) { newdig = div[i] + carry; if (newdig < 0) { carry = -((-newdig - 1) / NBASE) - 1; newdig -= carry * NBASE; } else if (newdig >= NBASE) { carry = newdig / NBASE; newdig -= carry * NBASE; } else carry = 0; res_digits[i] = newdig; } Assert(carry == 0); pfree(div); /* * Finally, round the result to the requested precision. */ result->weight = res_weight; result->sign = res_sign; /* Round to target rscale (and set result->dscale) */ if (round) round_var(result, rscale); else trunc_var(result, rscale); /* Strip leading and trailing zeroes */ strip_var(result); } /* * Default scale selection for division * * Returns the appropriate result scale for the division result. */ static int select_div_scale(NumericVar *var1, NumericVar *var2) { int weight1, weight2, qweight, i; NumericDigit firstdigit1, firstdigit2; int rscale; /* * The result scale of a division isn't specified in any SQL standard. For * PostgreSQL we select a result scale that will give at least * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a * result no less accurate than float8; but use a scale not less than * either input's display scale. */ /* Get the actual (normalized) weight and first digit of each input */ weight1 = 0; /* values to use if var1 is zero */ firstdigit1 = 0; for (i = 0; i < var1->ndigits; i++) { firstdigit1 = var1->digits[i]; if (firstdigit1 != 0) { weight1 = var1->weight - i; break; } } weight2 = 0; /* values to use if var2 is zero */ firstdigit2 = 0; for (i = 0; i < var2->ndigits; i++) { firstdigit2 = var2->digits[i]; if (firstdigit2 != 0) { weight2 = var2->weight - i; break; } } /* * Estimate weight of quotient. If the two first digits are equal, we * can't be sure, but assume that var1 is less than var2. */ qweight = weight1 - weight2; if (firstdigit1 <= firstdigit2) qweight--; /* Select result scale */ rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS; rscale = Max(rscale, var1->dscale); rscale = Max(rscale, var2->dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); return rscale; } /* * mod_var() - * * Calculate the modulo of two numerics at variable level */ static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result) { NumericVar tmp; init_var(&tmp); /* --------- * We do this using the equation * mod(x,y) = x - trunc(x/y)*y * div_var can be persuaded to give us trunc(x/y) directly. * ---------- */ div_var(var1, var2, &tmp, 0, false); mul_var(var2, &tmp, &tmp, var2->dscale); sub_var(var1, &tmp, result); free_var(&tmp); } /* * ceil_var() - * * Return the smallest integer greater than or equal to the argument * on variable level */ static void ceil_var(NumericVar *var, NumericVar *result) { NumericVar tmp; init_var(&tmp); set_var_from_var(var, &tmp); trunc_var(&tmp, 0); if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0) add_var(&tmp, &const_one, &tmp); set_var_from_var(&tmp, result); free_var(&tmp); } /* * floor_var() - * * Return the largest integer equal to or less than the argument * on variable level */ static void floor_var(NumericVar *var, NumericVar *result) { NumericVar tmp; init_var(&tmp); set_var_from_var(var, &tmp); trunc_var(&tmp, 0); if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0) sub_var(&tmp, &const_one, &tmp); set_var_from_var(&tmp, result); free_var(&tmp); } /* * sqrt_var() - * * Compute the square root of x using Newton's algorithm */ static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar tmp_arg; NumericVar tmp_val; NumericVar last_val; int local_rscale; int stat; local_rscale = rscale + 8; stat = cmp_var(arg, &const_zero); if (stat == 0) { zero_var(result); result->dscale = rscale; return; } /* * SQL2003 defines sqrt() in terms of power, so we need to emit the right * SQLSTATE error code if the operand is negative. */ if (stat < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION), errmsg("cannot take square root of a negative number"))); init_var(&tmp_arg); init_var(&tmp_val); init_var(&last_val); /* Copy arg in case it is the same var as result */ set_var_from_var(arg, &tmp_arg); /* * Initialize the result to the first guess */ alloc_var(result, 1); result->digits[0] = tmp_arg.digits[0] / 2; if (result->digits[0] == 0) result->digits[0] = 1; result->weight = tmp_arg.weight / 2; result->sign = NUMERIC_POS; set_var_from_var(result, &last_val); for (;;) { div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true); add_var(result, &tmp_val, result); mul_var(result, &const_zero_point_five, result, local_rscale); if (cmp_var(&last_val, result) == 0) break; set_var_from_var(result, &last_val); } free_var(&last_val); free_var(&tmp_val); free_var(&tmp_arg); /* Round to requested precision */ round_var(result, rscale); } /* * exp_var() - * * Raise e to the power of x */ static void exp_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar x; int xintval; bool xneg = FALSE; int local_rscale; /*---------- * We separate the integral and fraction parts of x, then compute * e^x = e^xint * e^xfrac * where e = exp(1) and e^xfrac = exp(xfrac) are computed by * exp_var_internal; the limited range of inputs allows that routine * to do a good job with a simple Taylor series. Raising e^xint is * done by repeated multiplications in power_var_int. *---------- */ init_var(&x); set_var_from_var(arg, &x); if (x.sign == NUMERIC_NEG) { xneg = TRUE; x.sign = NUMERIC_POS; } /* Extract the integer part, remove it from x */ xintval = 0; while (x.weight >= 0) { xintval *= NBASE; if (x.ndigits > 0) { xintval += x.digits[0]; x.digits++; x.ndigits--; } x.weight--; /* Guard against overflow */ if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("argument for function \"exp\" too big"))); } /* Select an appropriate scale for internal calculation */ local_rscale = rscale + MUL_GUARD_DIGITS * 2; /* Compute e^xfrac */ exp_var_internal(&x, result, local_rscale); /* If there's an integer part, multiply by e^xint */ if (xintval > 0) { NumericVar e; init_var(&e); exp_var_internal(&const_one, &e, local_rscale); power_var_int(&e, xintval, &e, local_rscale); mul_var(&e, result, result, local_rscale); free_var(&e); } /* Compensate for input sign, and round to requested rscale */ if (xneg) div_var_fast(&const_one, result, result, rscale, true); else round_var(result, rscale); free_var(&x); } /* * exp_var_internal() - * * Raise e to the power of x, where 0 <= x <= 1 * * NB: the result should be good to at least rscale digits, but it has * *not* been rounded off; the caller must do that if wanted. */ static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale) { NumericVar x; NumericVar xpow; NumericVar ifac; NumericVar elem; NumericVar ni; int ndiv2 = 0; int local_rscale; init_var(&x); init_var(&xpow); init_var(&ifac); init_var(&elem); init_var(&ni); set_var_from_var(arg, &x); Assert(x.sign == NUMERIC_POS); local_rscale = rscale + 8; /* Reduce input into range 0 <= x <= 0.01 */ while (cmp_var(&x, &const_zero_point_01) > 0) { ndiv2++; local_rscale++; mul_var(&x, &const_zero_point_five, &x, x.dscale + 1); } /* * Use the Taylor series * * exp(x) = 1 + x + x^2/2! + x^3/3! + ... * * Given the limited range of x, this should converge reasonably quickly. * We run the series until the terms fall below the local_rscale limit. */ add_var(&const_one, &x, result); set_var_from_var(&x, &xpow); set_var_from_var(&const_one, &ifac); set_var_from_var(&const_one, &ni); for (;;) { add_var(&ni, &const_one, &ni); mul_var(&xpow, &x, &xpow, local_rscale); mul_var(&ifac, &ni, &ifac, 0); div_var_fast(&xpow, &ifac, &elem, local_rscale, true); if (elem.ndigits == 0) break; add_var(result, &elem, result); } /* Compensate for argument range reduction */ while (ndiv2-- > 0) mul_var(result, result, result, local_rscale); free_var(&x); free_var(&xpow); free_var(&ifac); free_var(&elem); free_var(&ni); } /* * ln_var() - * * Compute the natural log of x */ static void ln_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar x; NumericVar xx; NumericVar ni; NumericVar elem; NumericVar fact; int local_rscale; int cmp; cmp = cmp_var(arg, &const_zero); if (cmp == 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG), errmsg("cannot take logarithm of zero"))); else if (cmp < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG), errmsg("cannot take logarithm of a negative number"))); local_rscale = rscale + 8; init_var(&x); init_var(&xx); init_var(&ni); init_var(&elem); init_var(&fact); set_var_from_var(arg, &x); set_var_from_var(&const_two, &fact); /* Reduce input into range 0.9 < x < 1.1 */ while (cmp_var(&x, &const_zero_point_nine) <= 0) { local_rscale++; sqrt_var(&x, &x, local_rscale); mul_var(&fact, &const_two, &fact, 0); } while (cmp_var(&x, &const_one_point_one) >= 0) { local_rscale++; sqrt_var(&x, &x, local_rscale); mul_var(&fact, &const_two, &fact, 0); } /* * We use the Taylor series for 0.5 * ln((1+z)/(1-z)), * * z + z^3/3 + z^5/5 + ... * * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048 * due to the above range-reduction of x. * * The convergence of this is not as fast as one would like, but is * tolerable given that z is small. */ sub_var(&x, &const_one, result); add_var(&x, &const_one, &elem); div_var_fast(result, &elem, result, local_rscale, true); set_var_from_var(result, &xx); mul_var(result, result, &x, local_rscale); set_var_from_var(&const_one, &ni); for (;;) { add_var(&ni, &const_two, &ni); mul_var(&xx, &x, &xx, local_rscale); div_var_fast(&xx, &ni, &elem, local_rscale, true); if (elem.ndigits == 0) break; add_var(result, &elem, result); if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS)) break; } /* Compensate for argument range reduction, round to requested rscale */ mul_var(result, &fact, result, rscale); free_var(&x); free_var(&xx); free_var(&ni); free_var(&elem); free_var(&fact); } /* * log_var() - * * Compute the logarithm of num in a given base. * * Note: this routine chooses dscale of the result. */ static void log_var(NumericVar *base, NumericVar *num, NumericVar *result) { NumericVar ln_base; NumericVar ln_num; int dec_digits; int rscale; int local_rscale; init_var(&ln_base); init_var(&ln_num); /* Set scale for ln() calculations --- compare numeric_ln() */ /* Approx decimal digits before decimal point */ dec_digits = (num->weight + 1) * DEC_DIGITS; if (dec_digits > 1) rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1); else if (dec_digits < 1) rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits); else rscale = NUMERIC_MIN_SIG_DIGITS; rscale = Max(rscale, base->dscale); rscale = Max(rscale, num->dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); local_rscale = rscale + 8; /* Form natural logarithms */ ln_var(base, &ln_base, local_rscale); ln_var(num, &ln_num, local_rscale); ln_base.dscale = rscale; ln_num.dscale = rscale; /* Select scale for division result */ rscale = select_div_scale(&ln_num, &ln_base); div_var_fast(&ln_num, &ln_base, result, rscale, true); free_var(&ln_num); free_var(&ln_base); } /* * power_var() - * * Raise base to the power of exp * * Note: this routine chooses dscale of the result. */ static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result) { NumericVar ln_base; NumericVar ln_num; int dec_digits; int rscale; int local_rscale; double val; /* If exp can be represented as an integer, use power_var_int */ if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1) { /* exact integer, but does it fit in int? */ NumericVar x; int64 expval64; /* must copy because numericvar_to_int8() scribbles on input */ init_var(&x); set_var_from_var(exp, &x); if (numericvar_to_int8(&x, &expval64)) { int expval = (int) expval64; /* Test for overflow by reverse-conversion. */ if ((int64) expval == expval64) { /* Okay, select rscale */ rscale = NUMERIC_MIN_SIG_DIGITS; rscale = Max(rscale, base->dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); power_var_int(base, expval, result, rscale); free_var(&x); return; } } free_var(&x); } /* * This avoids log(0) for cases of 0 raised to a non-integer. 0 ^ 0 * handled by power_var_int(). */ if (cmp_var(base, &const_zero) == 0) { set_var_from_var(&const_zero, result); result->dscale = NUMERIC_MIN_SIG_DIGITS; /* no need to round */ return; } init_var(&ln_base); init_var(&ln_num); /* Set scale for ln() calculation --- need extra accuracy here */ /* Approx decimal digits before decimal point */ dec_digits = (base->weight + 1) * DEC_DIGITS; if (dec_digits > 1) rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1); else if (dec_digits < 1) rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits); else rscale = NUMERIC_MIN_SIG_DIGITS * 2; rscale = Max(rscale, base->dscale * 2); rscale = Max(rscale, exp->dscale * 2); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2); local_rscale = rscale + 8; ln_var(base, &ln_base, local_rscale); mul_var(&ln_base, exp, &ln_num, local_rscale); /* Set scale for exp() -- compare numeric_exp() */ /* convert input to float8, ignoring overflow */ val = numericvar_to_double_no_overflow(&ln_num); /* * log10(result) = num * log10(e), so this is approximately the weight: */ val *= 0.434294481903252; /* limit to something that won't cause integer overflow */ val = Max(val, -NUMERIC_MAX_RESULT_SCALE); val = Min(val, NUMERIC_MAX_RESULT_SCALE); rscale = NUMERIC_MIN_SIG_DIGITS - (int) val; rscale = Max(rscale, base->dscale); rscale = Max(rscale, exp->dscale); rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); exp_var(&ln_num, result, rscale); free_var(&ln_num); free_var(&ln_base); } /* * power_var_int() - * * Raise base to the power of exp, where exp is an integer. */ static void power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale) { bool neg; NumericVar base_prod; int local_rscale; switch (exp) { case 0: /* * While 0 ^ 0 can be either 1 or indeterminate (error), we treat * it as 1 because most programming languages do this. SQL:2003 * also requires a return value of 1. * http://en.wikipedia.org/wiki/Exponentiation#Zero_to_the_zero_pow * er */ set_var_from_var(&const_one, result); result->dscale = rscale; /* no need to round */ return; case 1: set_var_from_var(base, result); round_var(result, rscale); return; case -1: div_var(&const_one, base, result, rscale, true); return; case 2: mul_var(base, base, result, rscale); return; default: break; } /* * The general case repeatedly multiplies base according to the bit * pattern of exp. We do the multiplications with some extra precision. */ neg = (exp < 0); exp = Abs(exp); local_rscale = rscale + MUL_GUARD_DIGITS * 2; init_var(&base_prod); set_var_from_var(base, &base_prod); if (exp & 1) set_var_from_var(base, result); else set_var_from_var(&const_one, result); while ((exp >>= 1) > 0) { mul_var(&base_prod, &base_prod, &base_prod, local_rscale); if (exp & 1) mul_var(&base_prod, result, result, local_rscale); } free_var(&base_prod); /* Compensate for input sign, and round to requested rscale */ if (neg) div_var_fast(&const_one, result, result, rscale, true); else round_var(result, rscale); } /* ---------------------------------------------------------------------- * * Following are the lowest level functions that operate unsigned * on the variable level * * ---------------------------------------------------------------------- */ /* ---------- * cmp_abs() - * * Compare the absolute values of var1 and var2 * Returns: -1 for ABS(var1) < ABS(var2) * 0 for ABS(var1) == ABS(var2) * 1 for ABS(var1) > ABS(var2) * ---------- */ static int cmp_abs(NumericVar *var1, NumericVar *var2) { return cmp_abs_common(var1->digits, var1->ndigits, var1->weight, var2->digits, var2->ndigits, var2->weight); } /* ---------- * cmp_abs_common() - * * Main routine of cmp_abs(). This function can be used by both * NumericVar and Numeric. * ---------- */ static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight, const NumericDigit *var2digits, int var2ndigits, int var2weight) { int i1 = 0; int i2 = 0; /* Check any digits before the first common digit */ while (var1weight > var2weight && i1 < var1ndigits) { if (var1digits[i1++] != 0) return 1; var1weight--; } while (var2weight > var1weight && i2 < var2ndigits) { if (var2digits[i2++] != 0) return -1; var2weight--; } /* At this point, either w1 == w2 or we've run out of digits */ if (var1weight == var2weight) { while (i1 < var1ndigits && i2 < var2ndigits) { int stat = var1digits[i1++] - var2digits[i2++]; if (stat) { if (stat > 0) return 1; return -1; } } } /* * At this point, we've run out of digits on one side or the other; so any * remaining nonzero digits imply that side is larger */ while (i1 < var1ndigits) { if (var1digits[i1++] != 0) return 1; } while (i2 < var2ndigits) { if (var2digits[i2++] != 0) return -1; } return 0; } /* * add_abs() - * * Add the absolute values of two variables into result. * result might point to one of the operands without danger. */ static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) { NumericDigit *res_buf; NumericDigit *res_digits; int res_ndigits; int res_weight; int res_rscale, rscale1, rscale2; int res_dscale; int i, i1, i2; int carry = 0; /* copy these values into local vars for speed in inner loop */ int var1ndigits = var1->ndigits; int var2ndigits = var2->ndigits; NumericDigit *var1digits = var1->digits; NumericDigit *var2digits = var2->digits; res_weight = Max(var1->weight, var2->weight) + 1; res_dscale = Max(var1->dscale, var2->dscale); /* Note: here we are figuring rscale in base-NBASE digits */ rscale1 = var1->ndigits - var1->weight - 1; rscale2 = var2->ndigits - var2->weight - 1; res_rscale = Max(rscale1, rscale2); res_ndigits = res_rscale + res_weight + 1; if (res_ndigits <= 0) res_ndigits = 1; res_buf = digitbuf_alloc(res_ndigits + 1); res_buf[0] = 0; /* spare digit for later rounding */ res_digits = res_buf + 1; i1 = res_rscale + var1->weight + 1; i2 = res_rscale + var2->weight + 1; for (i = res_ndigits - 1; i >= 0; i--) { i1--; i2--; if (i1 >= 0 && i1 < var1ndigits) carry += var1digits[i1]; if (i2 >= 0 && i2 < var2ndigits) carry += var2digits[i2]; if (carry >= NBASE) { res_digits[i] = carry - NBASE; carry = 1; } else { res_digits[i] = carry; carry = 0; } } Assert(carry == 0); /* else we failed to allow for carry out */ digitbuf_free(result->buf); result->ndigits = res_ndigits; result->buf = res_buf; result->digits = res_digits; result->weight = res_weight; result->dscale = res_dscale; /* Remove leading/trailing zeroes */ strip_var(result); } /* * sub_abs() * * Subtract the absolute value of var2 from the absolute value of var1 * and store in result. result might point to one of the operands * without danger. * * ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!! */ static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) { NumericDigit *res_buf; NumericDigit *res_digits; int res_ndigits; int res_weight; int res_rscale, rscale1, rscale2; int res_dscale; int i, i1, i2; int borrow = 0; /* copy these values into local vars for speed in inner loop */ int var1ndigits = var1->ndigits; int var2ndigits = var2->ndigits; NumericDigit *var1digits = var1->digits; NumericDigit *var2digits = var2->digits; res_weight = var1->weight; res_dscale = Max(var1->dscale, var2->dscale); /* Note: here we are figuring rscale in base-NBASE digits */ rscale1 = var1->ndigits - var1->weight - 1; rscale2 = var2->ndigits - var2->weight - 1; res_rscale = Max(rscale1, rscale2); res_ndigits = res_rscale + res_weight + 1; if (res_ndigits <= 0) res_ndigits = 1; res_buf = digitbuf_alloc(res_ndigits + 1); res_buf[0] = 0; /* spare digit for later rounding */ res_digits = res_buf + 1; i1 = res_rscale + var1->weight + 1; i2 = res_rscale + var2->weight + 1; for (i = res_ndigits - 1; i >= 0; i--) { i1--; i2--; if (i1 >= 0 && i1 < var1ndigits) borrow += var1digits[i1]; if (i2 >= 0 && i2 < var2ndigits) borrow -= var2digits[i2]; if (borrow < 0) { res_digits[i] = borrow + NBASE; borrow = -1; } else { res_digits[i] = borrow; borrow = 0; } } Assert(borrow == 0); /* else caller gave us var1 < var2 */ digitbuf_free(result->buf); result->ndigits = res_ndigits; result->buf = res_buf; result->digits = res_digits; result->weight = res_weight; result->dscale = res_dscale; /* Remove leading/trailing zeroes */ strip_var(result); } /* * round_var * * Round the value of a variable to no more than rscale decimal digits * after the decimal point. NOTE: we allow rscale < 0 here, implying * rounding before the decimal point. */ static void round_var(NumericVar *var, int rscale) { NumericDigit *digits = var->digits; int di; int ndigits; int carry; var->dscale = rscale; /* decimal digits wanted */ di = (var->weight + 1) * DEC_DIGITS + rscale; /* * If di = 0, the value loses all digits, but could round up to 1 if its * first extra digit is >= 5. If di < 0 the result must be 0. */ if (di < 0) { var->ndigits = 0; var->weight = 0; var->sign = NUMERIC_POS; } else { /* NBASE digits wanted */ ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS; /* 0, or number of decimal digits to keep in last NBASE digit */ di %= DEC_DIGITS; if (ndigits < var->ndigits || (ndigits == var->ndigits && di > 0)) { var->ndigits = ndigits; #if DEC_DIGITS == 1 /* di must be zero */ carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0; #else if (di == 0) carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0; else { /* Must round within last NBASE digit */ int extra, pow10; #if DEC_DIGITS == 4 pow10 = round_powers[di]; #elif DEC_DIGITS == 2 pow10 = 10; #else #error unsupported NBASE #endif extra = digits[--ndigits] % pow10; digits[ndigits] -= extra; carry = 0; if (extra >= pow10 / 2) { pow10 += digits[ndigits]; if (pow10 >= NBASE) { pow10 -= NBASE; carry = 1; } digits[ndigits] = pow10; } } #endif /* Propagate carry if needed */ while (carry) { carry += digits[--ndigits]; if (carry >= NBASE) { digits[ndigits] = carry - NBASE; carry = 1; } else { digits[ndigits] = carry; carry = 0; } } if (ndigits < 0) { Assert(ndigits == -1); /* better not have added > 1 digit */ Assert(var->digits > var->buf); var->digits--; var->ndigits++; var->weight++; } } } } /* * trunc_var * * Truncate (towards zero) the value of a variable at rscale decimal digits * after the decimal point. NOTE: we allow rscale < 0 here, implying * truncation before the decimal point. */ static void trunc_var(NumericVar *var, int rscale) { int di; int ndigits; var->dscale = rscale; /* decimal digits wanted */ di = (var->weight + 1) * DEC_DIGITS + rscale; /* * If di <= 0, the value loses all digits. */ if (di <= 0) { var->ndigits = 0; var->weight = 0; var->sign = NUMERIC_POS; } else { /* NBASE digits wanted */ ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS; if (ndigits <= var->ndigits) { var->ndigits = ndigits; #if DEC_DIGITS == 1 /* no within-digit stuff to worry about */ #else /* 0, or number of decimal digits to keep in last NBASE digit */ di %= DEC_DIGITS; if (di > 0) { /* Must truncate within last NBASE digit */ NumericDigit *digits = var->digits; int extra, pow10; #if DEC_DIGITS == 4 pow10 = round_powers[di]; #elif DEC_DIGITS == 2 pow10 = 10; #else #error unsupported NBASE #endif extra = digits[--ndigits] % pow10; digits[ndigits] -= extra; } #endif } } } /* * strip_var * * Strip any leading and trailing zeroes from a numeric variable */ static void strip_var(NumericVar *var) { NumericDigit *digits = var->digits; int ndigits = var->ndigits; /* Strip leading zeroes */ while (ndigits > 0 && *digits == 0) { digits++; var->weight--; ndigits--; } /* Strip trailing zeroes */ while (ndigits > 0 && digits[ndigits - 1] == 0) ndigits--; /* If it's zero, normalize the sign and weight */ if (ndigits == 0) { var->sign = NUMERIC_POS; var->weight = 0; } var->digits = digits; var->ndigits = ndigits; }