diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 94b495e606..4431fc3eb7 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1092,6 +1092,14 @@ pgbench options d
random_gaussian(1, 10, 2.5)
an integer between 1 and 10
+
+ random_zipfian(lb, ub, parameter)
+ integer
+ Zipfian-distributed random integer in [lb, ub],
+ see below
+ random_zipfian(1, 10, 1.5)
+ an integer between 1 and 10
+
sqrt(x)
double
@@ -1173,6 +1181,27 @@ f(x) = PHI(2.0 * parameter * (x - mu) / (max - min + 1)) /
of the Box-Muller transform.
+
+
+ random_zipfian generates an approximated bounded zipfian
+ distribution. For parameter in (0, 1), an
+ approximated algorithm is taken from
+ "Quickly Generating Billion-Record Synthetic Databases",
+ Jim Gray et al, SIGMOD 1994. For parameter
+ in (1, 1000), a rejection method is used, based on
+ "Non-Uniform Random Variate Generation", Luc Devroye, p. 550-551,
+ Springer 1986. The distribution is not defined when the parameter's
+ value is 1.0. The drawing performance is poor for parameter values
+ close and above 1.0 and on a small range.
+
+
+ parameter
+ defines how skewed the distribution is. The larger the parameter, the more
+ frequently values to the beginning of the interval are drawn.
+ The closer to 0 parameter is,
+ the flatter (more uniform) the access distribution.
+
+
diff --git a/src/bin/pgbench/exprparse.y b/src/bin/pgbench/exprparse.y
index b3a2d9bfd3..25d5ad48e5 100644
--- a/src/bin/pgbench/exprparse.y
+++ b/src/bin/pgbench/exprparse.y
@@ -191,6 +191,9 @@ static const struct
{
"random_exponential", 3, PGBENCH_RANDOM_EXPONENTIAL
},
+ {
+ "random_zipfian", 3, PGBENCH_RANDOM_ZIPFIAN
+ },
/* keep as last array element */
{
NULL, 0, 0
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index bd96eae5e6..7ce6f607f5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -95,7 +95,10 @@ static int pthread_join(pthread_t th, void **thread_return);
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
+#define ZIPF_CACHE_SIZE 15 /* cache cells number */
+
#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
+#define MAX_ZIPFIAN_PARAM 1000 /* maximum parameter for zipfian */
int nxacts = 0; /* number of transactions per client */
int duration = 0; /* duration in seconds */
@@ -330,6 +333,35 @@ typedef struct
int ecnt; /* error count */
} CState;
+/*
+ * Cache cell for zipfian_random call
+ */
+typedef struct
+{
+ /* cell keys */
+ double s; /* s - parameter of zipfan_random function */
+ int64 n; /* number of elements in range (max - min + 1) */
+
+ double harmonicn; /* generalizedHarmonicNumber(n, s) */
+ double alpha;
+ double beta;
+ double eta;
+
+ uint64 last_used; /* last used logical time */
+} ZipfCell;
+
+/*
+ * Zipf cache for zeta values
+ */
+typedef struct
+{
+ uint64 current; /* counter for LRU cache replacement algorithm */
+
+ int nb_cells; /* number of filled cells */
+ int overflowCount; /* number of cache overflows */
+ ZipfCell cells[ZIPF_CACHE_SIZE];
+} ZipfCache;
+
/*
* Thread state
*/
@@ -342,6 +374,8 @@ typedef struct
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
FILE *logfile; /* where to log, or NULL */
+ ZipfCache zipf_cache; /* for thread-safe zipfian random number
+ * generation */
/* per thread collected stats */
instr_time start_time; /* thread start time */
@@ -746,6 +780,137 @@ getPoissonRand(TState *thread, int64 center)
return (int64) (-log(uniform) * ((double) center) + 0.5);
}
+/* helper function for getZipfianRand */
+static double
+generalizedHarmonicNumber(int64 n, double s)
+{
+ int i;
+ double ans = 0.0;
+
+ for (i = n; i > 1; i--)
+ ans += pow(i, -s);
+ return ans + 1.0;
+}
+
+/* set harmonicn and other parameters to cache cell */
+static void
+zipfSetCacheCell(ZipfCell * cell, int64 n, double s)
+{
+ double harmonic2;
+
+ cell->n = n;
+ cell->s = s;
+
+ harmonic2 = generalizedHarmonicNumber(2, s);
+ cell->harmonicn = generalizedHarmonicNumber(n, s);
+
+ cell->alpha = 1.0 / (1.0 - s);
+ cell->beta = pow(0.5, s);
+ cell->eta = (1.0 - pow(2.0 / n, 1.0 - s)) / (1.0 - harmonic2 / cell->harmonicn);
+}
+
+/*
+ * search for cache cell with keys (n, s)
+ * and create new cell if it does not exist
+ */
+static ZipfCell *
+zipfFindOrCreateCacheCell(ZipfCache * cache, int64 n, double s)
+{
+ int i,
+ least_recently_used = 0;
+ ZipfCell *cell;
+
+ /* search cached cell for given parameters */
+ for (i = 0; i < cache->nb_cells; i++)
+ {
+ cell = &cache->cells[i];
+ if (cell->n == n && cell->s == s)
+ return &cache->cells[i];
+
+ if (cell->last_used < cache->cells[least_recently_used].last_used)
+ least_recently_used = i;
+ }
+
+ /* create new one if it does not exist */
+ if (cache->nb_cells < ZIPF_CACHE_SIZE)
+ i = cache->nb_cells++;
+ else
+ {
+ /* replace LRU cell if cache is full */
+ i = least_recently_used;
+ cache->overflowCount++;
+ }
+
+ zipfSetCacheCell(&cache->cells[i], n, s);
+
+ cache->cells[i].last_used = cache->current++;
+ return &cache->cells[i];
+}
+
+/*
+ * Computing zipfian using rejection method, based on
+ * "Non-Uniform Random Variate Generation",
+ * Luc Devroye, p. 550-551, Springer 1986.
+ */
+static int64
+computeIterativeZipfian(TState *thread, int64 n, double s)
+{
+ double b = pow(2.0, s - 1.0);
+ double x,
+ t,
+ u,
+ v;
+
+ while (true)
+ {
+ /* random variates */
+ u = pg_erand48(thread->random_state);
+ v = pg_erand48(thread->random_state);
+
+ x = floor(pow(u, -1.0 / (s - 1.0)));
+
+ t = pow(1.0 + 1.0 / x, s - 1.0);
+ /* reject if too large or out of bound */
+ if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
+ break;
+ }
+ return (int64) x;
+}
+
+/*
+ * Computing zipfian using harmonic numbers, based on algorithm described in
+ * "Quickly Generating Billion-Record Synthetic Databases",
+ * Jim Gray et al, SIGMOD 1994
+ */
+static int64
+computeHarmonicZipfian(TState *thread, int64 n, double s)
+{
+ ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
+ double uniform = pg_erand48(thread->random_state);
+ double uz = uniform * cell->harmonicn;
+
+ if (uz < 1.0)
+ return 1;
+ if (uz < 1.0 + cell->beta)
+ return 2;
+ return 1 + (int64) (cell->n * pow(cell->eta * uniform - cell->eta + 1.0, cell->alpha));
+}
+
+/* random number generator: zipfian distribution from min to max inclusive */
+static int64
+getZipfianRand(TState *thread, int64 min, int64 max, double s)
+{
+ int64 n = max - min + 1;
+
+ /* abort if parameter is invalid */
+ Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
+
+
+ return min - 1 + ((s > 1)
+ ? computeIterativeZipfian(thread, n, s)
+ : computeHarmonicZipfian(thread, n, s));
+}
+
/*
* Initialize the given SimpleStats struct to all zeroes
*/
@@ -1303,7 +1468,6 @@ coerceToDouble(PgBenchValue *pval, double *dval)
return true;
}
}
-
/* assign an integer value */
static void
setIntValue(PgBenchValue *pv, int64 ival)
@@ -1605,6 +1769,7 @@ evalFunc(TState *thread, CState *st,
case PGBENCH_RANDOM:
case PGBENCH_RANDOM_EXPONENTIAL:
case PGBENCH_RANDOM_GAUSSIAN:
+ case PGBENCH_RANDOM_ZIPFIAN:
{
int64 imin,
imax;
@@ -1655,6 +1820,18 @@ evalFunc(TState *thread, CState *st,
setIntValue(retval,
getGaussianRand(thread, imin, imax, param));
}
+ else if (func == PGBENCH_RANDOM_ZIPFIAN)
+ {
+ if (param <= 0.0 || param == 1.0 || param > MAX_ZIPFIAN_PARAM)
+ {
+ fprintf(stderr,
+ "zipfian parameter must be in range (0, 1) U (1, %d]"
+ " (got %f)\n", MAX_ZIPFIAN_PARAM, param);
+ return false;
+ }
+ setIntValue(retval,
+ getZipfianRand(thread, imin, imax, param));
+ }
else /* exponential */
{
if (param <= 0.0)
@@ -3683,6 +3860,8 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
tps_include,
tps_exclude;
int64 ntx = total->cnt - total->skipped;
+ int i,
+ totalCacheOverflows = 0;
time_include = INSTR_TIME_GET_DOUBLE(total_time);
@@ -3710,6 +3889,15 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
printf("number of transactions actually processed: " INT64_FORMAT "\n",
ntx);
}
+ /* Report zipfian cache overflow */
+ for (i = 0; i < nthreads; i++)
+ {
+ totalCacheOverflows += threads[i].zipf_cache.overflowCount;
+ }
+ if (totalCacheOverflows > 0)
+ {
+ printf("zipfian cache array overflowed %d time(s)\n", totalCacheOverflows);
+ }
/* Remaining stats are nonsensical if we failed to execute any xacts */
if (total->cnt <= 0)
@@ -4513,6 +4701,9 @@ main(int argc, char **argv)
thread->random_state[2] = random();
thread->logfile = NULL; /* filled in later */
thread->latency_late = 0;
+ thread->zipf_cache.nb_cells = 0;
+ thread->zipf_cache.current = 0;
+ thread->zipf_cache.overflowCount = 0;
initStats(&thread->stats, 0);
nclients_dealt += thread->nstate;
diff --git a/src/bin/pgbench/pgbench.h b/src/bin/pgbench/pgbench.h
index fd428af274..83fee1ae74 100644
--- a/src/bin/pgbench/pgbench.h
+++ b/src/bin/pgbench/pgbench.h
@@ -75,7 +75,8 @@ typedef enum PgBenchFunction
PGBENCH_SQRT,
PGBENCH_RANDOM,
PGBENCH_RANDOM_GAUSSIAN,
- PGBENCH_RANDOM_EXPONENTIAL
+ PGBENCH_RANDOM_EXPONENTIAL,
+ PGBENCH_RANDOM_ZIPFIAN
} PgBenchFunction;
typedef struct PgBenchExpr PgBenchExpr;
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index c095881312..e3cdf28628 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -231,7 +231,8 @@ pgbench(
qr{command=18.: double 18\b},
qr{command=19.: double 19\b},
qr{command=20.: double 20\b},
- qr{command=21.: int 9223372036854775807\b}, ],
+ qr{command=21.: int 9223372036854775807\b},
+ qr{command=23.: int [1-9]\b}, ],
'pgbench expressions',
{ '001_pgbench_expressions' => q{-- integer functions
\set i1 debug(random(1, 100))
@@ -261,6 +262,8 @@ pgbench(
\set maxint debug(:minint - 1)
-- reset a variable
\set i1 0
+-- yet another integer function
+\set id debug(random_zipfian(1, 9, 1.3))
} });
# backslash commands
@@ -371,6 +374,14 @@ SELECT LEAST(:i, :i, :i, :i, :i, :i, :i, :i, :i, :i, :i);
0,
[qr{exponential parameter must be greater }],
q{\set i random_exponential(0, 10, 0.0)} ],
+ [ 'set zipfian param to 1',
+ 0,
+ [qr{zipfian parameter must be in range \(0, 1\) U \(1, \d+\]}],
+ q{\set i random_zipfian(0, 10, 1)} ],
+ [ 'set zipfian param too large',
+ 0,
+ [qr{zipfian parameter must be in range \(0, 1\) U \(1, \d+\]}],
+ q{\set i random_zipfian(0, 10, 1000000)} ],
[ 'set non numeric value', 0,
[qr{malformed variable "foo" value: "bla"}], q{\set i :foo + 1} ],
[ 'set no expression', 1, [qr{syntax error}], q{\set i} ],
@@ -412,6 +423,31 @@ for my $e (@errors)
{ $n => $script });
}
+# zipfian cache array overflow
+pgbench(
+ '-t 1', 0,
+ [ qr{processed: 1/1}, qr{zipfian cache array overflowed 1 time\(s\)} ],
+ [ qr{^} ],
+ 'pgbench zipfian array overflow on random_zipfian',
+ { '001_pgbench_random_zipfian' => q{
+\set i random_zipfian(1, 100, 0.5)
+\set i random_zipfian(2, 100, 0.5)
+\set i random_zipfian(3, 100, 0.5)
+\set i random_zipfian(4, 100, 0.5)
+\set i random_zipfian(5, 100, 0.5)
+\set i random_zipfian(6, 100, 0.5)
+\set i random_zipfian(7, 100, 0.5)
+\set i random_zipfian(8, 100, 0.5)
+\set i random_zipfian(9, 100, 0.5)
+\set i random_zipfian(10, 100, 0.5)
+\set i random_zipfian(11, 100, 0.5)
+\set i random_zipfian(12, 100, 0.5)
+\set i random_zipfian(13, 100, 0.5)
+\set i random_zipfian(14, 100, 0.5)
+\set i random_zipfian(15, 100, 0.5)
+\set i random_zipfian(16, 100, 0.5)
+} });
+
# throttling
pgbench(
'-t 100 -S --rate=100000 --latency-limit=1000000 -c 2 -n -r',