From b60376649600268b75e1e8be86a6a6fe7fb9b3c3 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 29 Jan 2016 13:05:08 +0100 Subject: [PATCH] pgbench: refactor handling of stats tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This doesn't add any functionality but just shuffles things around so that it can be reused and improved later. Author: Fabien Coelho Reviewed-by: Michael Paquier, Álvaro Herrera --- src/bin/pgbench/pgbench.c | 658 ++++++++++++++++---------------------- 1 file changed, 278 insertions(+), 380 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index d5f242c23f..44da3d19c1 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -166,10 +166,8 @@ int agg_interval; /* log aggregates instead of individual * transactions */ int progress = 0; /* thread progress report every this seconds */ bool progress_timestamp = false; /* progress report with Unix time */ -int progress_nclients = 0; /* number of clients for progress - * report */ -int progress_nthreads = 0; /* number of threads for progress - * report */ +int nclients = 1; /* number of clients */ +int nthreads = 1; /* number of threads */ bool is_connect; /* establish connection for each transaction */ bool is_latencies; /* report per-command latencies */ int main_pid; /* main process id used in log filename */ @@ -192,6 +190,35 @@ typedef struct #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */ #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ +/* + * Simple data structure to keep stats about something. + * + * XXX probably the first value should be kept and used as an offset for + * better numerical stability... + */ +typedef struct SimpleStats +{ + int64 count; /* how many values were encountered */ + double min; /* the minimum seen */ + double max; /* the maximum seen */ + double sum; /* sum of values */ + double sum2; /* sum of squared values */ +} SimpleStats; + +/* + * Data structure to hold various statistics: per-thread stats are maintained + * and merged together. + */ +typedef struct StatsData +{ + long start_time; /* interval start time, for aggregates */ + int64 cnt; /* number of transactions */ + int64 skipped; /* number of transactions skipped under --rate + * and --latency-limit */ + SimpleStats latency; + SimpleStats lag; +} StatsData; + /* * Connection state */ @@ -213,10 +240,8 @@ typedef struct bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ /* per client collected stats */ - int cnt; /* xacts count */ + int64 cnt; /* transaction count */ int ecnt; /* error count */ - int64 txn_latencies; /* cumulated latencies */ - int64 txn_sqlats; /* cumulated square latencies */ } CState; /* @@ -228,19 +253,14 @@ typedef struct pthread_t thread; /* thread handle */ CState *state; /* array of CState */ int nstate; /* length of state[] */ - instr_time start_time; /* thread start time */ - instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ - int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ int64 throttle_trigger; /* previous/next throttling (us) */ /* per thread collected stats */ + instr_time start_time; /* thread start time */ instr_time conn_time; - int64 throttle_lag; /* total transaction lag behind throttling */ - int64 throttle_lag_max; /* max transaction lag */ - int64 throttle_latency_skipped; /* lagging transactions - * skipped */ - int64 latency_late; /* late transactions */ + StatsData stats; + int64 latency_late; /* executed but late transactions */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -272,33 +292,14 @@ typedef struct char *argv[MAX_ARGS]; /* command word list */ int cols[MAX_ARGS]; /* corresponding column starting from 1 */ PgBenchExpr *expr; /* parsed expression */ + SimpleStats stats; /* time spent in this command */ } Command; -typedef struct -{ - - long start_time; /* when does the interval start */ - int cnt; /* number of transactions */ - int skipped; /* number of transactions skipped under --rate - * and --latency-limit */ - - double min_latency; /* min/max latencies */ - double max_latency; - double sum_latency; /* sum(latency), sum(latency^2) - for - * estimates */ - double sum2_latency; - - double min_lag; - double max_lag; - double sum_lag; /* sum(lag) */ - double sum2_lag; /* sum(lag*lag) */ -} AggVals; - static struct { const char *name; - Command **commands; -} sql_script[MAX_SCRIPTS]; /* SQL script files */ + Command **commands; +} sql_script[MAX_SCRIPTS]; /* SQL script files */ static int num_scripts; /* number of scripts in sql_script[] */ static int num_commands = 0; /* total number of Command structs */ static int debug = 0; /* debug flag */ @@ -362,8 +363,11 @@ static struct static void setalarm(int seconds); static void *threadRun(void *arg); +static void processXactStats(TState *thread, CState *st, instr_time *now, + bool skipped, FILE *logfile, StatsData *agg); static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, - AggVals *agg, bool skipped); + StatsData *agg, bool skipped, double latency, double lag); + static void usage(void) @@ -602,6 +606,82 @@ getPoissonRand(TState *thread, int64 center) return (int64) (-log(uniform) * ((double) center) + 0.5); } +/* + * Initialize the given SimpleStats struct to all zeroes + */ +static void +initSimpleStats(SimpleStats *ss) +{ + memset(ss, 0, sizeof(SimpleStats)); +} + +/* + * Accumulate one value into a SimpleStats struct. + */ +static void +addToSimpleStats(SimpleStats *ss, double val) +{ + if (ss->count == 0 || val < ss->min) + ss->min = val; + if (ss->count == 0 || val > ss->max) + ss->max = val; + ss->count++; + ss->sum += val; + ss->sum2 += val * val; +} + +/* + * Merge two SimpleStats objects + */ +static void +mergeSimpleStats(SimpleStats *acc, SimpleStats *ss) +{ + if (acc->count == 0 || ss->min < acc->min) + acc->min = ss->min; + if (acc->count == 0 || ss->max > acc->max) + acc->max = ss->max; + acc->count += ss->count; + acc->sum += ss->sum; + acc->sum2 += ss->sum2; +} + +/* + * Initialize a StatsData struct to mostly zeroes, with its start time set to + * the given value. + */ +static void +initStats(StatsData *sd, double start_time) +{ + sd->start_time = start_time; + sd->cnt = 0; + sd->skipped = 0; + initSimpleStats(&sd->latency); + initSimpleStats(&sd->lag); +} + +/* + * Accumulate one additional item into the given stats object. + */ +static void +accumStats(StatsData *stats, bool skipped, double lat, double lag) +{ + stats->cnt++; + + if (skipped) + { + /* no latency to record on skipped transactions */ + stats->skipped++; + } + else + { + addToSimpleStats(&stats->latency, lat); + + /* and possibly the same for schedule lag */ + if (throttle_delay) + addToSimpleStats(&stats->lag, lag); + } +} + /* call PQexec() and exit() on failure */ static void executeStatement(PGconn *con, const char *sql) @@ -1121,30 +1201,6 @@ clientDone(CState *st, bool ok) return false; /* always false */ } -static void -agg_vals_init(AggVals *aggs, instr_time start) -{ - /* basic counters */ - aggs->cnt = 0; /* number of transactions (includes skipped) */ - aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */ - - aggs->sum_latency = 0; /* SUM(latency) */ - aggs->sum2_latency = 0; /* SUM(latency*latency) */ - - /* min and max transaction duration */ - aggs->min_latency = 0; - aggs->max_latency = 0; - - /* schedule lag counters */ - aggs->sum_lag = 0; - aggs->sum2_lag = 0; - aggs->min_lag = 0; - aggs->max_lag = 0; - - /* start of the current interval */ - aggs->start_time = INSTR_TIME_GET_DOUBLE(start); -} - static int chooseScript(TState *thread) { @@ -1156,7 +1212,7 @@ chooseScript(TState *thread) /* return false iff client should be disconnected */ static bool -doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg) +doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg) { PGresult *res; Command **commands; @@ -1210,11 +1266,8 @@ top: now_us = INSTR_TIME_GET_MICROSEC(now); while (thread->throttle_trigger < now_us - latency_limit) { - thread->throttle_latency_skipped++; - - if (logfile) - doLog(thread, st, logfile, &now, agg, true); - + processXactStats(thread, st, &now, true, logfile, agg); + /* next rendez-vous */ wait = getPoissonRand(thread, throttle_delay); thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; @@ -1231,28 +1284,13 @@ top: if (st->sleeping) { /* are we sleeping? */ - int64 now_us; - if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); - now_us = INSTR_TIME_GET_MICROSEC(now); - if (st->txn_scheduled <= now_us) - { - /* Done sleeping, go ahead with next command */ - st->sleeping = false; - if (st->throttling) - { - /* Measure lag of throttled transaction relative to target */ - int64 lag = now_us - st->txn_scheduled; - - thread->throttle_lag += lag; - if (lag > thread->throttle_lag_max) - thread->throttle_lag_max = lag; - st->throttling = false; - } - } - else + if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled) return true; /* Still sleeping, nothing to do here */ + /* Else done sleeping, go ahead with next command */ + st->sleeping = false; + st->throttling = false; } if (st->listen) @@ -1276,47 +1314,22 @@ top: */ if (is_latencies) { - int cnum = commands[st->state]->command_num; - if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); - INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum], - now, st->stmt_begin); - thread->exec_count[cnum]++; + + /* XXX could use a mutex here, but we choose not to */ + addToSimpleStats(&commands[st->state]->stats, + INSTR_TIME_GET_DOUBLE(now) - + INSTR_TIME_GET_DOUBLE(st->stmt_begin)); } /* transaction finished: calculate latency and log the transaction */ if (commands[st->state + 1] == NULL) { - /* only calculate latency if an option is used that needs it */ - if (progress || throttle_delay || latency_limit) - { - int64 latency; - - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - - latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled; - - st->txn_latencies += latency; - - /* - * XXX In a long benchmark run of high-latency transactions, - * this int64 addition eventually overflows. For example, 100 - * threads running 10s transactions will overflow it in 2.56 - * hours. With a more-typical OLTP workload of .1s - * transactions, overflow would take 256 hours. - */ - st->txn_sqlats += latency * latency; - - /* record over the limit transactions if needed. */ - if (latency_limit && latency > latency_limit) - thread->latency_late++; - } - - /* record the time it took in the log */ - if (logfile) - doLog(thread, st, logfile, &now, agg, false); + if (progress || throttle_delay || latency_limit || logfile) + processXactStats(thread, st, &now, false, logfile, agg); + else + thread->stats.cnt++; } if (commands[st->state]->type == SQL_COMMAND) @@ -1391,7 +1404,7 @@ top: return clientDone(st, false); } INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); + INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); } /* @@ -1496,7 +1509,7 @@ top: st->ecnt++; } else - st->listen = true; /* flags that should be listened */ + st->listen = true; /* flags that should be listened */ } else if (commands[st->state]->type == META_COMMAND) { @@ -1734,6 +1747,8 @@ top: else /* succeeded */ st->listen = true; } + + /* after a meta command, immediately proceed with next command */ goto top; } @@ -1744,12 +1759,9 @@ top: * print log entry after completing one transaction. */ static void -doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, - bool skipped) +doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, + StatsData *agg, bool skipped, double latency, double lag) { - double lag; - double latency; - /* * Skip the log entry if sampling is enabled and this row doesn't belong * to the random sample. @@ -1758,118 +1770,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, pg_erand48(thread->random_state) > sample_rate) return; - if (INSTR_TIME_IS_ZERO(*now)) - INSTR_TIME_SET_CURRENT(*now); - - latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled); - if (skipped) - lag = latency; - else - lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled); - /* should we aggregate the results or not? */ if (agg_interval > 0) { /* - * Are we still in the same interval? If yes, accumulate the values - * (print them otherwise) + * Loop until we reach the interval of the current transaction, and + * print all the empty intervals in between (this may happen with very + * low tps, e.g. --rate=0.1). */ - if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now)) + while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now)) { - agg->cnt += 1; - if (skipped) + /* print aggregated report to logfile */ + fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f", + agg->start_time, + agg->cnt, + agg->latency.sum, + agg->latency.sum2, + agg->latency.min, + agg->latency.max); + if (throttle_delay) { - /* - * there is no latency to record if the transaction was - * skipped - */ - agg->skipped += 1; + fprintf(logfile, " %.0f %.0f %.0f %.0f", + agg->lag.sum, + agg->lag.sum2, + agg->lag.min, + agg->lag.max); + if (latency_limit) + fprintf(logfile, " " INT64_FORMAT, agg->skipped); } - else - { - agg->sum_latency += latency; - agg->sum2_latency += latency * latency; + fputc('\n', logfile); - /* first in this aggregation interval */ - if ((agg->cnt == 1) || (latency < agg->min_latency)) - agg->min_latency = latency; - - if ((agg->cnt == 1) || (latency > agg->max_latency)) - agg->max_latency = latency; - - /* and the same for schedule lag */ - if (throttle_delay) - { - agg->sum_lag += lag; - agg->sum2_lag += lag * lag; - - if ((agg->cnt == 1) || (lag < agg->min_lag)) - agg->min_lag = lag; - if ((agg->cnt == 1) || (lag > agg->max_lag)) - agg->max_lag = lag; - } - } + /* reset data and move to next interval */ + initStats(agg, agg->start_time + agg_interval); } - else - { - /* - * Loop until we reach the interval of the current transaction - * (and print all the empty intervals in between). - */ - while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now)) - { - /* - * This is a non-Windows branch (thanks to the ifdef in - * usage), so we don't need to handle this in a special way - * (see below). - */ - fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f", - agg->start_time, - agg->cnt, - agg->sum_latency, - agg->sum2_latency, - agg->min_latency, - agg->max_latency); - if (throttle_delay) - { - fprintf(logfile, " %.0f %.0f %.0f %.0f", - agg->sum_lag, - agg->sum2_lag, - agg->min_lag, - agg->max_lag); - if (latency_limit) - fprintf(logfile, " %d", agg->skipped); - } - fputc('\n', logfile); - /* move to the next inteval */ - agg->start_time = agg->start_time + agg_interval; - - /* reset for "no transaction" intervals */ - agg->cnt = 0; - agg->skipped = 0; - agg->min_latency = 0; - agg->max_latency = 0; - agg->sum_latency = 0; - agg->sum2_latency = 0; - agg->min_lag = 0; - agg->max_lag = 0; - agg->sum_lag = 0; - agg->sum2_lag = 0; - } - - /* reset the values to include only the current transaction. */ - agg->cnt = 1; - agg->skipped = skipped ? 1 : 0; - agg->min_latency = latency; - agg->max_latency = latency; - agg->sum_latency = skipped ? 0.0 : latency; - agg->sum2_latency = skipped ? 0.0 : latency * latency; - agg->min_lag = lag; - agg->max_lag = lag; - agg->sum_lag = lag; - agg->sum2_lag = lag * lag; - } + /* accumulate the current transaction */ + accumStats(agg, skipped, latency, lag); } else { @@ -1878,21 +1814,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, /* This is more than we really ought to know about instr_time */ if (skipped) - fprintf(logfile, "%d %d skipped %d %ld %ld", + fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld", st->id, st->cnt, st->use_file, (long) now->tv_sec, (long) now->tv_usec); else - fprintf(logfile, "%d %d %.0f %d %ld %ld", + fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld", st->id, st->cnt, latency, st->use_file, (long) now->tv_sec, (long) now->tv_usec); #else /* On Windows, instr_time doesn't provide a timestamp anyway */ if (skipped) - fprintf(logfile, "%d %d skipped %d 0 0", + fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0", st->id, st->cnt, st->use_file); else - fprintf(logfile, "%d %d %.0f %d 0 0", + fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0", st->id, st->cnt, latency, st->use_file); #endif if (throttle_delay) @@ -1901,6 +1837,44 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, } } +/* + * Accumulate and report statistics at end of a transaction. + * + * (This is also called when a transaction is late and thus skipped.) + */ +static void +processXactStats(TState *thread, CState *st, instr_time *now, + bool skipped, FILE *logfile, StatsData *agg) +{ + double latency = 0.0, + lag = 0.0; + + if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now)) + INSTR_TIME_SET_CURRENT(*now); + + if (!skipped) + { + /* compute latency & lag */ + latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled; + lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled; + } + + if (progress || throttle_delay || latency_limit) + { + accumStats(&thread->stats, skipped, latency, lag); + + /* count transactions over the latency limit, if needed */ + if (latency_limit && latency > latency_limit) + thread->latency_late++; + } + else + thread->stats.cnt++; + + if (use_log) + doLog(thread, st, logfile, now, agg, skipped, latency, lag); +} + + /* discard connections */ static void disconnect_all(CState *state, int length) @@ -2297,6 +2271,7 @@ process_commands(char *buf, const char *source, const int lineno) my_commands->command_num = num_commands++; my_commands->type = 0; /* until set */ my_commands->argc = 0; + initSimpleStats(&my_commands->stats); if (*p == '\\') { @@ -2641,7 +2616,7 @@ process_builtin(const char *tb, const char *source) static void listAvailableScripts(void) { - int i; + int i; fprintf(stderr, "Available builtin scripts:\n"); for (i = 0; i < N_BUILTIN; i++) @@ -2689,22 +2664,29 @@ addScript(const char *name, Command **commands) num_scripts++; } +static void +printSimpleStats(char *prefix, SimpleStats *ss) +{ + /* print NaN if no transactions where executed */ + double latency = ss->sum / ss->count; + double stddev = sqrt(ss->sum2 / ss->count - latency * latency); + + printf("%s average = %.3f ms\n", prefix, 0.001 * latency); + printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev); +} + /* print out results */ static void -printResults(int64 normal_xacts, int nclients, - TState *threads, int nthreads, - instr_time total_time, instr_time conn_total_time, - int64 total_latencies, int64 total_sqlats, - int64 throttle_lag, int64 throttle_lag_max, - int64 throttle_latency_skipped, int64 latency_late) +printResults(TState *threads, StatsData *total, instr_time total_time, + instr_time conn_total_time, int latency_late) { double time_include, tps_include, tps_exclude; time_include = INSTR_TIME_GET_DOUBLE(total_time); - tps_include = normal_xacts / time_include; - tps_exclude = normal_xacts / (time_include - + tps_include = total->cnt / time_include; + tps_exclude = total->cnt / (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients)); printf("transaction type: %s\n", @@ -2716,46 +2698,36 @@ printResults(int64 normal_xacts, int nclients, if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); - printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n", - normal_xacts, (int64) nxacts * nclients); + printf("number of transactions actually processed: " INT64_FORMAT "/%d\n", + total->cnt, nxacts * nclients); } else { printf("duration: %d s\n", duration); printf("number of transactions actually processed: " INT64_FORMAT "\n", - normal_xacts); + total->cnt); } /* Remaining stats are nonsensical if we failed to execute any xacts */ - if (normal_xacts <= 0) + if (total->cnt <= 0) return; if (throttle_delay && latency_limit) printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n", - throttle_latency_skipped, - 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts)); + total->skipped, + 100.0 * total->skipped / (total->skipped + total->cnt)); if (latency_limit) - printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n", + printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n", latency_limit / 1000.0, latency_late, - 100.0 * latency_late / (throttle_latency_skipped + normal_xacts)); + 100.0 * latency_late / (total->skipped + total->cnt)); if (throttle_delay || progress || latency_limit) - { - /* compute and show latency average and standard deviation */ - double latency = 0.001 * total_latencies / normal_xacts; - double sqlat = (double) total_sqlats / normal_xacts; - - printf("latency average: %.3f ms\n" - "latency stddev: %.3f ms\n", - latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency)); - } + printSimpleStats("latency", &total->latency); else - { /* only an average latency computed from the duration is available */ printf("latency average: %.3f ms\n", - 1000.0 * duration * nclients / normal_xacts); - } + 1000.0 * duration * nclients / total->cnt); if (throttle_delay) { @@ -2766,7 +2738,7 @@ printResults(int64 normal_xacts, int nclients, * the database load, or the Poisson throttling process. */ printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n", - 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max); } printf("tps = %f (including connections establishing)\n", tps_include); @@ -2785,33 +2757,9 @@ printResults(int64 normal_xacts, int nclients, printf(" - statement latencies in milliseconds:\n"); for (commands = sql_script[i].commands; *commands != NULL; commands++) - { - Command *command = *commands; - int cnum = command->command_num; - double total_time; - instr_time total_exec_elapsed; - int total_exec_count; - int t; - - /* Accumulate per-thread data for command */ - INSTR_TIME_SET_ZERO(total_exec_elapsed); - total_exec_count = 0; - for (t = 0; t < nthreads; t++) - { - TState *thread = &threads[t]; - - INSTR_TIME_ADD(total_exec_elapsed, - thread->exec_elapsed[cnum]); - total_exec_count += thread->exec_count[cnum]; - } - - if (total_exec_count > 0) - total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count; - else - total_time = 0.0; - - printf("\t%f\t%s\n", total_time, command->line); - } + printf(" %11.3f %s\n", + 1000.0 * (*commands)->stats.sum / (*commands)->stats.count, + (*commands)->line); } } } @@ -2860,8 +2808,6 @@ main(int argc, char **argv) }; int c; - int nclients = 1; /* default number of simulated clients */ - int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ @@ -2878,13 +2824,8 @@ main(int argc, char **argv) instr_time start_time; /* start up time */ instr_time total_time; instr_time conn_total_time; - int64 total_xacts = 0; - int64 total_latencies = 0; - int64 total_sqlats = 0; - int64 throttle_lag = 0; - int64 throttle_lag_max = 0; - int64 throttle_latency_skipped = 0; int64 latency_late = 0; + StatsData stats; char *desc; int i; @@ -3071,14 +3012,14 @@ main(int argc, char **argv) case 'S': addScript(desc, process_builtin(findBuiltin("select-only", &desc), - desc)); + desc)); benchmarking_option_set = true; internal_script_used = true; break; case 'N': addScript(desc, process_builtin(findBuiltin("simple-update", &desc), - desc)); + desc)); benchmarking_option_set = true; internal_script_used = true; break; @@ -3311,8 +3252,6 @@ main(int argc, char **argv) * changed after fork. */ main_pid = (int) getpid(); - progress_nclients = nclients; - progress_nthreads = nthreads; if (nclients > 1) { @@ -3454,32 +3393,10 @@ main(int argc, char **argv) thread->random_state[0] = random(); thread->random_state[1] = random(); thread->random_state[2] = random(); - thread->throttle_latency_skipped = 0; thread->latency_late = 0; + initStats(&thread->stats, 0.0); nclients_dealt += thread->nstate; - - if (is_latencies) - { - /* Reserve memory for the thread to store per-command latencies */ - int t; - - thread->exec_elapsed = (instr_time *) - pg_malloc(sizeof(instr_time) * num_commands); - thread->exec_count = (int *) - pg_malloc(sizeof(int) * num_commands); - - for (t = 0; t < num_commands; t++) - { - INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]); - thread->exec_count[t] = 0; - } - } - else - { - thread->exec_elapsed = NULL; - thread->exec_count = NULL; - } } /* all clients must be assigned to a thread */ @@ -3522,11 +3439,11 @@ main(int argc, char **argv) #endif /* ENABLE_THREAD_SAFETY */ /* wait for threads and accumulate results */ + initStats(&stats, 0.0); INSTR_TIME_SET_ZERO(conn_total_time); for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; - int j; #ifdef ENABLE_THREAD_SAFETY if (threads[i].thread == INVALID_THREAD) @@ -3539,21 +3456,13 @@ main(int argc, char **argv) (void) threadRun(thread); #endif /* ENABLE_THREAD_SAFETY */ - /* thread level stats */ - throttle_lag += thread->throttle_lag; - throttle_latency_skipped += threads->throttle_latency_skipped; + /* aggregate thread level stats */ + mergeSimpleStats(&stats.latency, &thread->stats.latency); + mergeSimpleStats(&stats.lag, &thread->stats.lag); + stats.cnt += thread->stats.cnt; + stats.skipped += thread->stats.skipped; latency_late += thread->latency_late; - if (throttle_lag_max > thread->throttle_lag_max) - throttle_lag_max = thread->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, thread->conn_time); - - /* client-level stats */ - for (j = 0; j < thread->nstate; j++) - { - total_xacts += thread->state[j].cnt; - total_latencies += thread->state[j].txn_latencies; - total_sqlats += thread->state[j].txn_sqlats; - } } disconnect_all(state, nclients); @@ -3569,10 +3478,7 @@ main(int argc, char **argv) */ INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); - printResults(total_xacts, nclients, threads, nthreads, - total_time, conn_total_time, total_latencies, total_sqlats, - throttle_lag, throttle_lag_max, throttle_latency_skipped, - latency_late); + printResults(threads, &stats, total_time, conn_total_time, latency_late); return 0; } @@ -3593,13 +3499,8 @@ threadRun(void *arg) int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); int64 last_report = thread_start; int64 next_report = last_report + (int64) progress * 1000000; - int64 last_count = 0, - last_lats = 0, - last_sqlats = 0, - last_lags = 0, - last_skipped = 0; - - AggVals aggs; + StatsData last, + aggs; /* * Initialize throttling rate target for all of the thread's clients. It @@ -3609,8 +3510,6 @@ threadRun(void *arg) */ INSTR_TIME_SET_CURRENT(start); thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); - thread->throttle_lag = 0; - thread->throttle_lag_max = 0; INSTR_TIME_SET_ZERO(thread->conn_time); @@ -3647,7 +3546,8 @@ threadRun(void *arg) INSTR_TIME_SET_CURRENT(thread->conn_time); INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time); - agg_vals_init(&aggs, thread->start_time); + initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); + last = aggs; /* send start up queries in async manner */ for (i = 0; i < nstate; i++) @@ -3661,7 +3561,7 @@ threadRun(void *arg) if (debug) fprintf(stderr, "client %d executing script \"%s\"\n", st->id, sql_script[st->use_file].name); - if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs)) + if (!doCustom(thread, st, logfile, &aggs)) remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) @@ -3800,7 +3700,7 @@ threadRun(void *arg) if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) || commands[st->state]->type == META_COMMAND)) { - if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs)) + if (!doCustom(thread, st, logfile, &aggs)) remains--; /* I've aborted */ } @@ -3825,11 +3725,7 @@ threadRun(void *arg) if (now >= next_report) { /* generate and show report */ - int64 count = 0, - lats = 0, - sqlats = 0, - lags = 0, - skipped = 0; + StatsData cur; int64 run = now - last_report; double tps, total_run, @@ -3850,25 +3746,24 @@ threadRun(void *arg) * (If a read from a 64-bit integer is not atomic, you might * get a "torn" read and completely bogus latencies though!) */ - for (i = 0; i < progress_nclients; i++) + initStats(&cur, 0.0); + for (i = 0; i < nthreads; i++) { - count += state[i].cnt; - lats += state[i].txn_latencies; - sqlats += state[i].txn_sqlats; - } - - for (i = 0; i < progress_nthreads; i++) - { - skipped += thread[i].throttle_latency_skipped; - lags += thread[i].throttle_lag; + mergeSimpleStats(&cur.latency, &thread[i].stats.latency); + mergeSimpleStats(&cur.lag, &thread[i].stats.lag); + cur.cnt += thread[i].stats.cnt; + cur.skipped += thread[i].stats.skipped; } total_run = (now - thread_start) / 1000000.0; - tps = 1000000.0 * (count - last_count) / run; - latency = 0.001 * (lats - last_lats) / (count - last_count); - sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); + tps = 1000000.0 * (cur.cnt - last.cnt) / run; + latency = 0.001 * (cur.latency.sum - last.latency.sum) / + (cur.cnt - last.cnt); + sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) + / (cur.cnt - last.cnt); stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (lags - last_lags) / (count - last_count); + lag = 0.001 * (cur.lag.sum - last.lag.sum) / + (cur.cnt - last.cnt); if (progress_timestamp) sprintf(tbuf, "%.03f s", @@ -3885,16 +3780,12 @@ threadRun(void *arg) fprintf(stderr, ", lag %.3f ms", lag); if (latency_limit) fprintf(stderr, ", " INT64_FORMAT " skipped", - skipped - last_skipped); + cur.skipped - last.skipped); } fprintf(stderr, "\n"); - last_count = count; - last_lats = lats; - last_sqlats = sqlats; - last_lags = lags; + last = cur; last_report = now; - last_skipped = skipped; /* * Ensure that the next report is in the future, in case @@ -3914,7 +3805,14 @@ done: INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); if (logfile) + { + if (agg_interval) + { + /* log aggregated but not yet reported transactions */ + doLog(thread, state, logfile, &end, &aggs, false, 0, 0); + } fclose(logfile); + } return NULL; }