pgbench: introduce a RandomState struct

This becomes useful when used to retry a transaction after a
serialization error or deadlock abort.  (We don't yet have that feature,
but this is preparation for it.)

While at it, use separate random state for thread administratrivia such
as deciding which script to run, how long to delay for throttling, or
whether to log a message when sampling; this not only makes these tasks
independent of each other, but makes the actual thread run
deterministic.

Author: Marina Polyakova
Reviewed-by: Fabien Coelho
Discussion: https://postgr.es/m/72a0d590d6ba06f242d75c2e641820ec@postgrespro.ru
This commit is contained in:
Alvaro Herrera 2018-11-16 15:43:40 -03:00
parent a7aa608e0f
commit 4092319194
1 changed files with 74 additions and 32 deletions

View File

@ -279,6 +279,14 @@ typedef struct StatsData
SimpleStats lag;
} StatsData;
/*
* Struct to keep random state.
*/
typedef struct RandomState
{
unsigned short xseed[3];
} RandomState;
/*
* Connection state machine states.
*/
@ -360,6 +368,12 @@ typedef struct
ConnectionStateEnum state; /* state machine's current state. */
ConditionalStack cstack; /* enclosing conditionals state */
/*
* Separate randomness for each client. This is used for random functions
* PGBENCH_RANDOM_* during the execution of the script.
*/
RandomState cs_func_rs;
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
@ -419,7 +433,16 @@ typedef struct
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state[] */
unsigned short random_state[3]; /* separate randomness for each thread */
/*
* Separate randomness for each thread. Each thread option uses its own
* random state to make all of them independent of each other and therefore
* deterministic at the thread level.
*/
RandomState ts_choose_rs; /* random state for selecting a script */
RandomState ts_throttle_rs; /* random state for transaction throttling */
RandomState ts_sample_rs; /* random state for log sampling */
int64 throttle_trigger; /* previous/next throttling (us) */
FILE *logfile; /* where to log, or NULL */
ZipfCache zipf_cache; /* for thread-safe zipfian random number
@ -769,9 +792,20 @@ strtodouble(const char *str, bool errorOK, double *dv)
return true;
}
/*
* Initialize a random state struct.
*/
static void
initRandomState(RandomState *random_state)
{
random_state->xseed[0] = random();
random_state->xseed[1] = random();
random_state->xseed[2] = random();
}
/* random number generator: uniform distribution from min to max inclusive */
static int64
getrand(TState *thread, int64 min, int64 max)
getrand(RandomState *random_state, int64 min, int64 max)
{
/*
* Odd coding is so that min and max have approximately the same chance of
@ -782,7 +816,7 @@ getrand(TState *thread, int64 min, int64 max)
* protected by a mutex, and therefore a bottleneck on machines with many
* CPUs.
*/
return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed));
}
/*
@ -791,7 +825,8 @@ getrand(TState *thread, int64 min, int64 max)
* value is exp(-parameter).
*/
static int64
getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
getExponentialRand(RandomState *random_state, int64 min, int64 max,
double parameter)
{
double cut,
uniform,
@ -801,7 +836,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
Assert(parameter > 0.0);
cut = exp(-parameter);
/* erand in [0, 1), uniform in (0, 1] */
uniform = 1.0 - pg_erand48(thread->random_state);
uniform = 1.0 - pg_erand48(random_state->xseed);
/*
* inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
@ -814,7 +849,8 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
/* random number generator: gaussian distribution from min to max inclusive */
static int64
getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
getGaussianRand(RandomState *random_state, int64 min, int64 max,
double parameter)
{
double stdev;
double rand;
@ -842,8 +878,8 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
* are expected in (0, 1] (see
* https://en.wikipedia.org/wiki/Box-Muller_transform)
*/
double rand1 = 1.0 - pg_erand48(thread->random_state);
double rand2 = 1.0 - pg_erand48(thread->random_state);
double rand1 = 1.0 - pg_erand48(random_state->xseed);
double rand2 = 1.0 - pg_erand48(random_state->xseed);
/* Box-Muller basic form transform */
double var_sqrt = sqrt(-2.0 * log(rand1));
@ -873,7 +909,7 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
* not be one.
*/
static int64
getPoissonRand(TState *thread, double center)
getPoissonRand(RandomState *random_state, double center)
{
/*
* Use inverse transform sampling to generate a value > 0, such that the
@ -882,7 +918,7 @@ getPoissonRand(TState *thread, double center)
double uniform;
/* erand in [0, 1), uniform in (0, 1] */
uniform = 1.0 - pg_erand48(thread->random_state);
uniform = 1.0 - pg_erand48(random_state->xseed);
return (int64) (-log(uniform) * center + 0.5);
}
@ -960,7 +996,7 @@ zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
* Luc Devroye, p. 550-551, Springer 1986.
*/
static int64
computeIterativeZipfian(TState *thread, int64 n, double s)
computeIterativeZipfian(RandomState *random_state, int64 n, double s)
{
double b = pow(2.0, s - 1.0);
double x,
@ -971,8 +1007,8 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
while (true)
{
/* random variates */
u = pg_erand48(thread->random_state);
v = pg_erand48(thread->random_state);
u = pg_erand48(random_state->xseed);
v = pg_erand48(random_state->xseed);
x = floor(pow(u, -1.0 / (s - 1.0)));
@ -990,10 +1026,11 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
* Jim Gray et al, SIGMOD 1994
*/
static int64
computeHarmonicZipfian(TState *thread, int64 n, double s)
computeHarmonicZipfian(ZipfCache *zipf_cache, RandomState *random_state,
int64 n, double s)
{
ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
double uniform = pg_erand48(thread->random_state);
ZipfCell *cell = zipfFindOrCreateCacheCell(zipf_cache, n, s);
double uniform = pg_erand48(random_state->xseed);
double uz = uniform * cell->harmonicn;
if (uz < 1.0)
@ -1005,17 +1042,17 @@ computeHarmonicZipfian(TState *thread, int64 n, double s)
/* random number generator: zipfian distribution from min to max inclusive */
static int64
getZipfianRand(TState *thread, int64 min, int64 max, double s)
getZipfianRand(ZipfCache *zipf_cache, RandomState *random_state, int64 min,
int64 max, double s)
{
int64 n = max - min + 1;
/* abort if parameter is invalid */
Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
return min - 1 + ((s > 1)
? computeIterativeZipfian(thread, n, s)
: computeHarmonicZipfian(thread, n, s));
? computeIterativeZipfian(random_state, n, s)
: computeHarmonicZipfian(zipf_cache, random_state, n, s));
}
/*
@ -2310,7 +2347,7 @@ evalStandardFunc(TState *thread, CState *st,
if (func == PGBENCH_RANDOM)
{
Assert(nargs == 2);
setIntValue(retval, getrand(thread, imin, imax));
setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
}
else /* gaussian & exponential */
{
@ -2332,7 +2369,8 @@ evalStandardFunc(TState *thread, CState *st,
}
setIntValue(retval,
getGaussianRand(thread, imin, imax, param));
getGaussianRand(&st->cs_func_rs,
imin, imax, param));
}
else if (func == PGBENCH_RANDOM_ZIPFIAN)
{
@ -2344,7 +2382,9 @@ evalStandardFunc(TState *thread, CState *st,
return false;
}
setIntValue(retval,
getZipfianRand(thread, imin, imax, param));
getZipfianRand(&thread->zipf_cache,
&st->cs_func_rs,
imin, imax, param));
}
else /* exponential */
{
@ -2357,7 +2397,8 @@ evalStandardFunc(TState *thread, CState *st,
}
setIntValue(retval,
getExponentialRand(thread, imin, imax, param));
getExponentialRand(&st->cs_func_rs,
imin, imax, param));
}
}
@ -2652,7 +2693,7 @@ chooseScript(TState *thread)
if (num_scripts == 1)
return 0;
w = getrand(thread, 0, total_weight - 1);
w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
do
{
w -= sql_script[i++].weight;
@ -2846,7 +2887,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* away.
*/
Assert(throttle_delay > 0);
wait = getPoissonRand(thread, throttle_delay);
wait = getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
@ -2880,7 +2921,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
processXactStats(thread, st, &now, true, agg);
/* next rendez-vous */
wait = getPoissonRand(thread, throttle_delay);
wait = getPoissonRand(&thread->ts_throttle_rs,
throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
}
@ -3423,7 +3465,7 @@ doLog(TState *thread, CState *st,
* to the random sample.
*/
if (sample_rate != 0.0 &&
pg_erand48(thread->random_state) > sample_rate)
pg_erand48(thread->ts_sample_rs.xseed) > sample_rate)
return;
/* should we aggregate the results or not? */
@ -4851,7 +4893,6 @@ set_random_seed(const char *seed)
return true;
}
int
main(int argc, char **argv)
{
@ -5465,6 +5506,7 @@ main(int argc, char **argv)
for (i = 0; i < nclients; i++)
{
state[i].cstack = conditional_stack_create();
initRandomState(&state[i].cs_func_rs);
}
if (debug)
@ -5598,9 +5640,9 @@ main(int argc, char **argv)
thread->state = &state[nclients_dealt];
thread->nstate =
(nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
initRandomState(&thread->ts_choose_rs);
initRandomState(&thread->ts_throttle_rs);
initRandomState(&thread->ts_sample_rs);
thread->logfile = NULL; /* filled in later */
thread->latency_late = 0;
thread->zipf_cache.nb_cells = 0;