Remove thread-emulation support from pgbench.

You can no longer use pgbench with multiple threads when compiled without
--enable-thread-safety. That's an acceptable limitation these days; it
still works fine with -j1, and all modern platforms support threads anyway.
This makes future maintenance and development of the code easier.

Fabien Coelho
This commit is contained in:
Heikki Linnakangas 2015-07-03 11:48:54 +03:00
parent 9031ff91a1
commit 1bc90f7a7b
1 changed files with 66 additions and 247 deletions

View File

@ -70,20 +70,8 @@ static int pthread_join(pthread_t th, void **thread_return);
/* Use platform-dependent pthread capability */
#include <pthread.h>
#else
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
#define PTHREAD_FORK_EMULATION
#include <sys/wait.h>
#define pthread_t pg_pthread_t
#define pthread_attr_t pg_pthread_attr_t
#define pthread_create pg_pthread_create
#define pthread_join pg_pthread_join
typedef struct fork_pthread *pthread_t;
typedef int pthread_attr_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
/* No threads implementation, use none (-j 1) */
#define pthread_t void *
#endif
@ -210,8 +198,6 @@ typedef struct
PGconn *con; /* connection handle to DB */
int id; /* client No. */
int state; /* state No. */
int cnt; /* xacts count */
int ecnt; /* error count */
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
@ -221,15 +207,19 @@ typedef struct
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
int64 txn_latencies; /* cumulated latencies */
int64 txn_sqlats; /* cumulated square latencies */
bool is_throttled; /* whether transaction throttling is done */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
/* per client collected stats */
int cnt; /* xacts count */
int ecnt; /* error count */
int64 txn_latencies; /* cumulated latencies */
int64 txn_sqlats; /* cumulated square latencies */
} CState;
/*
* Thread state and result
* Thread state
*/
typedef struct
{
@ -242,6 +232,9 @@ typedef struct
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
/* per thread collected stats */
instr_time conn_time;
int64 throttle_lag; /* total transaction lag behind throttling */
int64 throttle_lag_max; /* max transaction lag */
int64 throttle_latency_skipped; /* lagging transactions
@ -251,18 +244,6 @@ typedef struct
#define INVALID_THREAD ((pthread_t) 0)
typedef struct
{
instr_time conn_time;
int64 xacts;
int64 latencies;
int64 sqlats;
int64 throttle_lag;
int64 throttle_lag_max;
int64 throttle_latency_skipped;
int64 latency_late;
} TResult;
/*
* queries read from files
*/
@ -2926,6 +2907,13 @@ main(int argc, char **argv)
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
exit(1);
}
#ifndef ENABLE_THREAD_SAFETY
if (nthreads != 1)
{
fprintf(stderr, "threads are not supported on this platform, use -j1\n");
exit(1);
}
#endif /* !ENABLE_THREAD_SAFETY */
break;
case 'C':
benchmarking_option_set = true;
@ -3194,22 +3182,6 @@ main(int argc, char **argv)
exit(1);
}
/*
* is_latencies only works with multiple threads in thread-based
* implementations, not fork-based ones, because it supposes that the
* parent can see changes made to the per-thread execution stats by child
* threads. It seems useful enough to accept despite this limitation, but
* perhaps we should FIXME someday (by passing the stats data back up
* through the parent-to-child pipes).
*/
#ifndef ENABLE_THREAD_SAFETY
if (is_latencies && nthreads > 1)
{
fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
exit(1);
}
#endif
/*
* save main process id in the global variable because process id will be
* changed after fork.
@ -3414,6 +3386,7 @@ main(int argc, char **argv)
setalarm(duration);
/* start threads */
#ifdef ENABLE_THREAD_SAFETY
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
@ -3436,32 +3409,43 @@ main(int argc, char **argv)
thread->thread = INVALID_THREAD;
}
}
#else
INSTR_TIME_SET_CURRENT(threads[0].start_time);
threads[0].thread = INVALID_THREAD;
#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
void *ret = NULL;
TState *thread = &threads[i];
int j;
#ifdef ENABLE_THREAD_SAFETY
if (threads[i].thread == INVALID_THREAD)
ret = threadRun(&threads[i]);
/* actually run this thread directly in the main thread */
(void) threadRun(thread);
else
pthread_join(threads[i].thread, &ret);
/* wait of other threads. should check that 0 is returned? */
pthread_join(thread->thread, NULL);
#else
(void) threadRun(thread);
#endif /* ENABLE_THREAD_SAFETY */
if (ret != NULL)
/* thread level stats */
throttle_lag += thread->throttle_lag;
throttle_latency_skipped = threads->throttle_latency_skipped;
latency_late = thread->latency_late;
if (throttle_lag_max > thread->throttle_lag_max)
throttle_lag_max = thread->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
/* client-level stats */
for (j = 0; j < thread->nstate; j++)
{
TResult *r = (TResult *) ret;
total_xacts += r->xacts;
total_latencies += r->latencies;
total_sqlats += r->sqlats;
throttle_lag += r->throttle_lag;
throttle_latency_skipped += r->throttle_latency_skipped;
latency_late += r->latency_late;
if (r->throttle_lag_max > throttle_lag_max)
throttle_lag_max = r->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
total_xacts += thread->state[j].cnt;
total_latencies += thread->state[i].txn_latencies;
total_sqlats += thread->state[i].txn_sqlats;
}
}
disconnect_all(state, nclients);
@ -3491,7 +3475,6 @@ threadRun(void *arg)
{
TState *thread = (TState *) arg;
CState *state = thread->state;
TResult *result;
FILE *logfile = NULL; /* per-thread log file */
instr_time start,
end;
@ -3522,9 +3505,7 @@ threadRun(void *arg)
thread->throttle_lag = 0;
thread->throttle_lag_max = 0;
result = pg_malloc(sizeof(TResult));
INSTR_TIME_SET_ZERO(result->conn_time);
INSTR_TIME_SET_ZERO(thread->conn_time);
/* open log file if requested */
if (use_log)
@ -3555,8 +3536,8 @@ threadRun(void *arg)
}
/* time after thread and connections set up */
INSTR_TIME_SET_CURRENT(result->conn_time);
INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
INSTR_TIME_SET_CURRENT(thread->conn_time);
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
agg_vals_init(&aggs, thread->start_time);
@ -3568,7 +3549,7 @@ threadRun(void *arg)
int prev_ecnt = st->ecnt;
st->use_file = getrand(thread, 0, num_files - 1);
if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@ -3650,11 +3631,7 @@ threadRun(void *arg)
}
/* also wake up to print the next progress report on time */
if (progress && min_usec > 0
#if !defined(PTHREAD_FORK_EMULATION)
&& thread->tid == 0
#endif /* !PTHREAD_FORK_EMULATION */
)
if (progress && min_usec > 0)
{
/* get current time if needed */
if (now_usec == 0)
@ -3710,7 +3687,7 @@ threadRun(void *arg)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
}
@ -3723,76 +3700,6 @@ threadRun(void *arg)
}
}
#ifdef PTHREAD_FORK_EMULATION
/* each process reports its own progression */
if (progress)
{
instr_time now_time;
int64 now;
INSTR_TIME_SET_CURRENT(now_time);
now = INSTR_TIME_GET_MICROSEC(now_time);
if (now >= next_report)
{
/* generate and show report */
int64 count = 0,
lats = 0,
sqlats = 0,
skipped = 0;
int64 lags = thread->throttle_lag;
int64 run = now - last_report;
double tps,
total_run,
latency,
sqlat,
stdev,
lag;
for (i = 0; i < nstate; i++)
{
count += state[i].cnt;
lats += state[i].txn_latencies;
sqlats += state[i].txn_sqlats;
}
total_run = (now - thread_start) / 1000000.0;
tps = 1000000.0 * (count - last_count) / run;
latency = 0.001 * (lats - last_lats) / (count - last_count);
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (lags - last_lags) / (count - last_count);
skipped = thread->throttle_latency_skipped - last_skipped;
fprintf(stderr,
"progress %d: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f",
thread->tid, total_run, tps, latency, stdev);
if (throttle_delay)
{
fprintf(stderr, ", lag %.3f ms", lag);
if (latency_limit)
fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
}
fprintf(stderr, "\n");
last_count = count;
last_lats = lats;
last_sqlats = sqlats;
last_lags = lags;
last_report = now;
last_skipped = thread->throttle_latency_skipped;
/*
* Ensure that the next report is in the future, in case
* pgbench/postgres got stuck somewhere.
*/
do
{
next_report += (int64) progress *1000000;
} while (now >= next_report);
}
}
#else
/* progress report by thread 0 for all threads */
if (progress && thread->tid == 0)
{
@ -3817,6 +3724,17 @@ threadRun(void *arg)
lag,
stdev;
/*
* Add up the statistics of all threads.
*
* XXX: No locking. There is no guarantee that we get an
* atomic snapshot of the transaction count and latencies, so
* these figures can well be off by a small amount. The
* progress is report's purpose is to give a quick overview of
* how the test is going, so that shouldn't matter too much.
* (If a read from a 64-bit integer is not atomic, you might
* get a "torn" read and completely bogus latencies though!)
*/
for (i = 0; i < progress_nclients; i++)
{
count += state[i].cnt;
@ -3864,31 +3782,16 @@ threadRun(void *arg)
} while (now >= next_report);
}
}
#endif /* PTHREAD_FORK_EMULATION */
}
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
result->xacts = 0;
result->latencies = 0;
result->sqlats = 0;
for (i = 0; i < nstate; i++)
{
result->xacts += state[i].cnt;
result->latencies += state[i].txn_latencies;
result->sqlats += state[i].txn_sqlats;
}
result->throttle_lag = thread->throttle_lag;
result->throttle_lag_max = thread->throttle_lag_max;
result->throttle_latency_skipped = thread->throttle_latency_skipped;
result->latency_late = thread->latency_late;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
fclose(logfile);
return result;
return NULL;
}
/*
@ -3910,90 +3813,6 @@ setalarm(int seconds)
alarm(seconds);
}
#ifndef ENABLE_THREAD_SAFETY
/*
* implements pthread using fork.
*/
typedef struct fork_pthread
{
pid_t pid;
int pipes[2];
} fork_pthread;
static int
pthread_create(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine) (void *),
void *arg)
{
fork_pthread *th;
void *ret;
int rc;
th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
if (pipe(th->pipes) < 0)
{
free(th);
return errno;
}
th->pid = fork();
if (th->pid == -1) /* error */
{
free(th);
return errno;
}
if (th->pid != 0) /* in parent process */
{
close(th->pipes[1]);
*thread = th;
return 0;
}
/* in child process */
close(th->pipes[0]);
/* set alarm again because the child does not inherit timers */
if (duration > 0)
setalarm(duration);
ret = start_routine(arg);
rc = write(th->pipes[1], ret, sizeof(TResult));
(void) rc;
close(th->pipes[1]);
free(th);
exit(0);
}
static int
pthread_join(pthread_t th, void **thread_return)
{
int status;
while (waitpid(th->pid, &status, 0) != th->pid)
{
if (errno != EINTR)
return errno;
}
if (thread_return != NULL)
{
/* assume result is TResult */
*thread_return = pg_malloc(sizeof(TResult));
if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
{
free(*thread_return);
*thread_return = NULL;
}
}
close(th->pipes[0]);
free(th);
return 0;
}
#endif
#else /* WIN32 */
static VOID CALLBACK