mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-10-01 14:41:24 +02:00
Remove thread-emulation support from pgbench.
You can no longer use pgbench with multiple threads when compiled without --enable-thread-safety. That's an acceptable limitation these days; it still works fine with -j1, and all modern platforms support threads anyway. This makes future maintenance and development of the code easier. Fabien Coelho
This commit is contained in:
parent
9031ff91a1
commit
1bc90f7a7b
@ -70,20 +70,8 @@ static int pthread_join(pthread_t th, void **thread_return);
|
||||
/* Use platform-dependent pthread capability */
|
||||
#include <pthread.h>
|
||||
#else
|
||||
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
|
||||
#define PTHREAD_FORK_EMULATION
|
||||
#include <sys/wait.h>
|
||||
|
||||
#define pthread_t pg_pthread_t
|
||||
#define pthread_attr_t pg_pthread_attr_t
|
||||
#define pthread_create pg_pthread_create
|
||||
#define pthread_join pg_pthread_join
|
||||
|
||||
typedef struct fork_pthread *pthread_t;
|
||||
typedef int pthread_attr_t;
|
||||
|
||||
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
|
||||
static int pthread_join(pthread_t th, void **thread_return);
|
||||
/* No threads implementation, use none (-j 1) */
|
||||
#define pthread_t void *
|
||||
#endif
|
||||
|
||||
|
||||
@ -210,8 +198,6 @@ typedef struct
|
||||
PGconn *con; /* connection handle to DB */
|
||||
int id; /* client No. */
|
||||
int state; /* state No. */
|
||||
int cnt; /* xacts count */
|
||||
int ecnt; /* error count */
|
||||
int listen; /* 0 indicates that an async query has been
|
||||
* sent */
|
||||
int sleeping; /* 1 indicates that the client is napping */
|
||||
@ -221,15 +207,19 @@ typedef struct
|
||||
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
|
||||
instr_time txn_begin; /* used for measuring schedule lag times */
|
||||
instr_time stmt_begin; /* used for measuring statement latencies */
|
||||
int64 txn_latencies; /* cumulated latencies */
|
||||
int64 txn_sqlats; /* cumulated square latencies */
|
||||
bool is_throttled; /* whether transaction throttling is done */
|
||||
int use_file; /* index in sql_files for this client */
|
||||
bool prepared[MAX_FILES];
|
||||
|
||||
/* per client collected stats */
|
||||
int cnt; /* xacts count */
|
||||
int ecnt; /* error count */
|
||||
int64 txn_latencies; /* cumulated latencies */
|
||||
int64 txn_sqlats; /* cumulated square latencies */
|
||||
} CState;
|
||||
|
||||
/*
|
||||
* Thread state and result
|
||||
* Thread state
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
@ -242,6 +232,9 @@ typedef struct
|
||||
int *exec_count; /* number of cmd executions (per Command) */
|
||||
unsigned short random_state[3]; /* separate randomness for each thread */
|
||||
int64 throttle_trigger; /* previous/next throttling (us) */
|
||||
|
||||
/* per thread collected stats */
|
||||
instr_time conn_time;
|
||||
int64 throttle_lag; /* total transaction lag behind throttling */
|
||||
int64 throttle_lag_max; /* max transaction lag */
|
||||
int64 throttle_latency_skipped; /* lagging transactions
|
||||
@ -251,18 +244,6 @@ typedef struct
|
||||
|
||||
#define INVALID_THREAD ((pthread_t) 0)
|
||||
|
||||
typedef struct
|
||||
{
|
||||
instr_time conn_time;
|
||||
int64 xacts;
|
||||
int64 latencies;
|
||||
int64 sqlats;
|
||||
int64 throttle_lag;
|
||||
int64 throttle_lag_max;
|
||||
int64 throttle_latency_skipped;
|
||||
int64 latency_late;
|
||||
} TResult;
|
||||
|
||||
/*
|
||||
* queries read from files
|
||||
*/
|
||||
@ -2926,6 +2907,13 @@ main(int argc, char **argv)
|
||||
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
|
||||
exit(1);
|
||||
}
|
||||
#ifndef ENABLE_THREAD_SAFETY
|
||||
if (nthreads != 1)
|
||||
{
|
||||
fprintf(stderr, "threads are not supported on this platform, use -j1\n");
|
||||
exit(1);
|
||||
}
|
||||
#endif /* !ENABLE_THREAD_SAFETY */
|
||||
break;
|
||||
case 'C':
|
||||
benchmarking_option_set = true;
|
||||
@ -3194,22 +3182,6 @@ main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/*
|
||||
* is_latencies only works with multiple threads in thread-based
|
||||
* implementations, not fork-based ones, because it supposes that the
|
||||
* parent can see changes made to the per-thread execution stats by child
|
||||
* threads. It seems useful enough to accept despite this limitation, but
|
||||
* perhaps we should FIXME someday (by passing the stats data back up
|
||||
* through the parent-to-child pipes).
|
||||
*/
|
||||
#ifndef ENABLE_THREAD_SAFETY
|
||||
if (is_latencies && nthreads > 1)
|
||||
{
|
||||
fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* save main process id in the global variable because process id will be
|
||||
* changed after fork.
|
||||
@ -3414,6 +3386,7 @@ main(int argc, char **argv)
|
||||
setalarm(duration);
|
||||
|
||||
/* start threads */
|
||||
#ifdef ENABLE_THREAD_SAFETY
|
||||
for (i = 0; i < nthreads; i++)
|
||||
{
|
||||
TState *thread = &threads[i];
|
||||
@ -3436,32 +3409,43 @@ main(int argc, char **argv)
|
||||
thread->thread = INVALID_THREAD;
|
||||
}
|
||||
}
|
||||
#else
|
||||
INSTR_TIME_SET_CURRENT(threads[0].start_time);
|
||||
threads[0].thread = INVALID_THREAD;
|
||||
#endif /* ENABLE_THREAD_SAFETY */
|
||||
|
||||
/* wait for threads and accumulate results */
|
||||
INSTR_TIME_SET_ZERO(conn_total_time);
|
||||
for (i = 0; i < nthreads; i++)
|
||||
{
|
||||
void *ret = NULL;
|
||||
TState *thread = &threads[i];
|
||||
int j;
|
||||
|
||||
#ifdef ENABLE_THREAD_SAFETY
|
||||
if (threads[i].thread == INVALID_THREAD)
|
||||
ret = threadRun(&threads[i]);
|
||||
/* actually run this thread directly in the main thread */
|
||||
(void) threadRun(thread);
|
||||
else
|
||||
pthread_join(threads[i].thread, &ret);
|
||||
/* wait of other threads. should check that 0 is returned? */
|
||||
pthread_join(thread->thread, NULL);
|
||||
#else
|
||||
(void) threadRun(thread);
|
||||
#endif /* ENABLE_THREAD_SAFETY */
|
||||
|
||||
if (ret != NULL)
|
||||
/* thread level stats */
|
||||
throttle_lag += thread->throttle_lag;
|
||||
throttle_latency_skipped = threads->throttle_latency_skipped;
|
||||
latency_late = thread->latency_late;
|
||||
if (throttle_lag_max > thread->throttle_lag_max)
|
||||
throttle_lag_max = thread->throttle_lag_max;
|
||||
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
|
||||
|
||||
/* client-level stats */
|
||||
for (j = 0; j < thread->nstate; j++)
|
||||
{
|
||||
TResult *r = (TResult *) ret;
|
||||
|
||||
total_xacts += r->xacts;
|
||||
total_latencies += r->latencies;
|
||||
total_sqlats += r->sqlats;
|
||||
throttle_lag += r->throttle_lag;
|
||||
throttle_latency_skipped += r->throttle_latency_skipped;
|
||||
latency_late += r->latency_late;
|
||||
if (r->throttle_lag_max > throttle_lag_max)
|
||||
throttle_lag_max = r->throttle_lag_max;
|
||||
INSTR_TIME_ADD(conn_total_time, r->conn_time);
|
||||
free(ret);
|
||||
total_xacts += thread->state[j].cnt;
|
||||
total_latencies += thread->state[i].txn_latencies;
|
||||
total_sqlats += thread->state[i].txn_sqlats;
|
||||
}
|
||||
}
|
||||
disconnect_all(state, nclients);
|
||||
@ -3491,7 +3475,6 @@ threadRun(void *arg)
|
||||
{
|
||||
TState *thread = (TState *) arg;
|
||||
CState *state = thread->state;
|
||||
TResult *result;
|
||||
FILE *logfile = NULL; /* per-thread log file */
|
||||
instr_time start,
|
||||
end;
|
||||
@ -3522,9 +3505,7 @@ threadRun(void *arg)
|
||||
thread->throttle_lag = 0;
|
||||
thread->throttle_lag_max = 0;
|
||||
|
||||
result = pg_malloc(sizeof(TResult));
|
||||
|
||||
INSTR_TIME_SET_ZERO(result->conn_time);
|
||||
INSTR_TIME_SET_ZERO(thread->conn_time);
|
||||
|
||||
/* open log file if requested */
|
||||
if (use_log)
|
||||
@ -3555,8 +3536,8 @@ threadRun(void *arg)
|
||||
}
|
||||
|
||||
/* time after thread and connections set up */
|
||||
INSTR_TIME_SET_CURRENT(result->conn_time);
|
||||
INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
|
||||
INSTR_TIME_SET_CURRENT(thread->conn_time);
|
||||
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
|
||||
|
||||
agg_vals_init(&aggs, thread->start_time);
|
||||
|
||||
@ -3568,7 +3549,7 @@ threadRun(void *arg)
|
||||
int prev_ecnt = st->ecnt;
|
||||
|
||||
st->use_file = getrand(thread, 0, num_files - 1);
|
||||
if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
|
||||
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
|
||||
remains--; /* I've aborted */
|
||||
|
||||
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
|
||||
@ -3650,11 +3631,7 @@ threadRun(void *arg)
|
||||
}
|
||||
|
||||
/* also wake up to print the next progress report on time */
|
||||
if (progress && min_usec > 0
|
||||
#if !defined(PTHREAD_FORK_EMULATION)
|
||||
&& thread->tid == 0
|
||||
#endif /* !PTHREAD_FORK_EMULATION */
|
||||
)
|
||||
if (progress && min_usec > 0)
|
||||
{
|
||||
/* get current time if needed */
|
||||
if (now_usec == 0)
|
||||
@ -3710,7 +3687,7 @@ threadRun(void *arg)
|
||||
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|
||||
|| commands[st->state]->type == META_COMMAND))
|
||||
{
|
||||
if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
|
||||
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
|
||||
remains--; /* I've aborted */
|
||||
}
|
||||
|
||||
@ -3723,76 +3700,6 @@ threadRun(void *arg)
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef PTHREAD_FORK_EMULATION
|
||||
/* each process reports its own progression */
|
||||
if (progress)
|
||||
{
|
||||
instr_time now_time;
|
||||
int64 now;
|
||||
|
||||
INSTR_TIME_SET_CURRENT(now_time);
|
||||
now = INSTR_TIME_GET_MICROSEC(now_time);
|
||||
if (now >= next_report)
|
||||
{
|
||||
/* generate and show report */
|
||||
int64 count = 0,
|
||||
lats = 0,
|
||||
sqlats = 0,
|
||||
skipped = 0;
|
||||
int64 lags = thread->throttle_lag;
|
||||
int64 run = now - last_report;
|
||||
double tps,
|
||||
total_run,
|
||||
latency,
|
||||
sqlat,
|
||||
stdev,
|
||||
lag;
|
||||
|
||||
for (i = 0; i < nstate; i++)
|
||||
{
|
||||
count += state[i].cnt;
|
||||
lats += state[i].txn_latencies;
|
||||
sqlats += state[i].txn_sqlats;
|
||||
}
|
||||
|
||||
total_run = (now - thread_start) / 1000000.0;
|
||||
tps = 1000000.0 * (count - last_count) / run;
|
||||
latency = 0.001 * (lats - last_lats) / (count - last_count);
|
||||
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
|
||||
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
|
||||
lag = 0.001 * (lags - last_lags) / (count - last_count);
|
||||
skipped = thread->throttle_latency_skipped - last_skipped;
|
||||
|
||||
fprintf(stderr,
|
||||
"progress %d: %.1f s, %.1f tps, "
|
||||
"lat %.3f ms stddev %.3f",
|
||||
thread->tid, total_run, tps, latency, stdev);
|
||||
if (throttle_delay)
|
||||
{
|
||||
fprintf(stderr, ", lag %.3f ms", lag);
|
||||
if (latency_limit)
|
||||
fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
|
||||
}
|
||||
fprintf(stderr, "\n");
|
||||
|
||||
last_count = count;
|
||||
last_lats = lats;
|
||||
last_sqlats = sqlats;
|
||||
last_lags = lags;
|
||||
last_report = now;
|
||||
last_skipped = thread->throttle_latency_skipped;
|
||||
|
||||
/*
|
||||
* Ensure that the next report is in the future, in case
|
||||
* pgbench/postgres got stuck somewhere.
|
||||
*/
|
||||
do
|
||||
{
|
||||
next_report += (int64) progress *1000000;
|
||||
} while (now >= next_report);
|
||||
}
|
||||
}
|
||||
#else
|
||||
/* progress report by thread 0 for all threads */
|
||||
if (progress && thread->tid == 0)
|
||||
{
|
||||
@ -3817,6 +3724,17 @@ threadRun(void *arg)
|
||||
lag,
|
||||
stdev;
|
||||
|
||||
/*
|
||||
* Add up the statistics of all threads.
|
||||
*
|
||||
* XXX: No locking. There is no guarantee that we get an
|
||||
* atomic snapshot of the transaction count and latencies, so
|
||||
* these figures can well be off by a small amount. The
|
||||
* progress is report's purpose is to give a quick overview of
|
||||
* how the test is going, so that shouldn't matter too much.
|
||||
* (If a read from a 64-bit integer is not atomic, you might
|
||||
* get a "torn" read and completely bogus latencies though!)
|
||||
*/
|
||||
for (i = 0; i < progress_nclients; i++)
|
||||
{
|
||||
count += state[i].cnt;
|
||||
@ -3864,31 +3782,16 @@ threadRun(void *arg)
|
||||
} while (now >= next_report);
|
||||
}
|
||||
}
|
||||
#endif /* PTHREAD_FORK_EMULATION */
|
||||
}
|
||||
|
||||
done:
|
||||
INSTR_TIME_SET_CURRENT(start);
|
||||
disconnect_all(state, nstate);
|
||||
result->xacts = 0;
|
||||
result->latencies = 0;
|
||||
result->sqlats = 0;
|
||||
for (i = 0; i < nstate; i++)
|
||||
{
|
||||
result->xacts += state[i].cnt;
|
||||
result->latencies += state[i].txn_latencies;
|
||||
result->sqlats += state[i].txn_sqlats;
|
||||
}
|
||||
result->throttle_lag = thread->throttle_lag;
|
||||
result->throttle_lag_max = thread->throttle_lag_max;
|
||||
result->throttle_latency_skipped = thread->throttle_latency_skipped;
|
||||
result->latency_late = thread->latency_late;
|
||||
|
||||
INSTR_TIME_SET_CURRENT(end);
|
||||
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
|
||||
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
|
||||
if (logfile)
|
||||
fclose(logfile);
|
||||
return result;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -3910,90 +3813,6 @@ setalarm(int seconds)
|
||||
alarm(seconds);
|
||||
}
|
||||
|
||||
#ifndef ENABLE_THREAD_SAFETY
|
||||
|
||||
/*
|
||||
* implements pthread using fork.
|
||||
*/
|
||||
|
||||
typedef struct fork_pthread
|
||||
{
|
||||
pid_t pid;
|
||||
int pipes[2];
|
||||
} fork_pthread;
|
||||
|
||||
static int
|
||||
pthread_create(pthread_t *thread,
|
||||
pthread_attr_t *attr,
|
||||
void *(*start_routine) (void *),
|
||||
void *arg)
|
||||
{
|
||||
fork_pthread *th;
|
||||
void *ret;
|
||||
int rc;
|
||||
|
||||
th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
|
||||
if (pipe(th->pipes) < 0)
|
||||
{
|
||||
free(th);
|
||||
return errno;
|
||||
}
|
||||
|
||||
th->pid = fork();
|
||||
if (th->pid == -1) /* error */
|
||||
{
|
||||
free(th);
|
||||
return errno;
|
||||
}
|
||||
if (th->pid != 0) /* in parent process */
|
||||
{
|
||||
close(th->pipes[1]);
|
||||
*thread = th;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* in child process */
|
||||
close(th->pipes[0]);
|
||||
|
||||
/* set alarm again because the child does not inherit timers */
|
||||
if (duration > 0)
|
||||
setalarm(duration);
|
||||
|
||||
ret = start_routine(arg);
|
||||
rc = write(th->pipes[1], ret, sizeof(TResult));
|
||||
(void) rc;
|
||||
close(th->pipes[1]);
|
||||
free(th);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
static int
|
||||
pthread_join(pthread_t th, void **thread_return)
|
||||
{
|
||||
int status;
|
||||
|
||||
while (waitpid(th->pid, &status, 0) != th->pid)
|
||||
{
|
||||
if (errno != EINTR)
|
||||
return errno;
|
||||
}
|
||||
|
||||
if (thread_return != NULL)
|
||||
{
|
||||
/* assume result is TResult */
|
||||
*thread_return = pg_malloc(sizeof(TResult));
|
||||
if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
|
||||
{
|
||||
free(*thread_return);
|
||||
*thread_return = NULL;
|
||||
}
|
||||
}
|
||||
close(th->pipes[0]);
|
||||
|
||||
free(th);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
#else /* WIN32 */
|
||||
|
||||
static VOID CALLBACK
|
||||
|
Loading…
Reference in New Issue
Block a user