From 3da0dfb4b1460c3701abc8ed5f516d138dc4654c Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Mon, 3 Aug 2009 15:18:14 +0000 Subject: [PATCH] Multi-threaded version of pgbench contributed by ITAGAKI Takahiro, reviewed by Greg Smith and Josh Williams. Following is the proposal from ITAGAKI Takahiro: Pgbench is a famous tool to measure postgres performance, but nowadays it does not work well because it cannot use multiple CPUs. On the other hand, postgres server can use CPUs very well, so the bottle-neck of workload is *in pgbench*. Multi-threading would be a solution. The attached patch adds -j (number of jobs) option to pgbench. If the value N is greater than 1, pgbench runs with N threads. Connections are equally-divided into them (ex. -c64 -j4 => 4 threads with 16 connections each). It can run on POSIX platforms with pthread and on Windows with win32 threads. Here are results of multi-threaded pgbench runs on Fedora 11 with intel core i7 (8 logical cores = 4 physical cores * HT). -j8 (8 threads) was the best and the tps is 4.5 times of -j1, that is a traditional result. $ pgbench -i -s10 $ pgbench -n -S -c64 -j1 => tps = 11600.158593 $ pgbench -n -S -c64 -j2 => tps = 17947.100954 $ pgbench -n -S -c64 -j4 => tps = 26571.124001 $ pgbench -n -S -c64 -j8 => tps = 52725.470403 $ pgbench -n -S -c64 -j16 => tps = 38976.675319 $ pgbench -n -S -c64 -j32 => tps = 28998.499601 $ pgbench -n -S -c64 -j64 => tps = 26701.877815 Is it acceptable to use pthread in contrib module? If ok, I will add the patch to the next commitfest. --- contrib/pgbench/pgbench.c | 790 ++++++++++++++++++++++++-------------- doc/src/sgml/pgbench.sgml | 10 +- 2 files changed, 520 insertions(+), 280 deletions(-) diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 7ede6954aa..0c3704a2ff 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -4,7 +4,7 @@ * A simple benchmark program for PostgreSQL * Originally written by Tatsuo Ishii and enhanced by many contributors. * - * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $ + * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $ * Copyright (c) 2000-2009, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -35,6 +35,7 @@ #include "libpq-fe.h" #include "pqsignal.h" +#include "portability/instr_time.h" #include @@ -58,6 +59,40 @@ #include /* for getrlimit */ #endif +#ifndef INT64_MAX +#define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF) +#endif + +/* + * Multi-platform pthread implementations + */ + +#ifdef WIN32 +/* Use native win32 threads on Windows */ +typedef struct win32_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); + +#elif defined(ENABLE_THREAD_SAFETY) +/* Use platform-dependent pthread */ +#include + +#else + +#include +/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */ +#define pthread_t pg_pthread_t +#define pthread_attr_t pg_pthread_attr_t +#define pthread_create pg_pthread_create +#define pthread_join pg_pthread_join +typedef struct fork_pthread *pthread_t; +typedef int pthread_attr_t; +static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); +static int pthread_join(pthread_t th, void **thread_return); + +#endif + extern char *optarg; extern int optind; @@ -74,7 +109,6 @@ extern int optind; #define DEFAULT_NXACTS 10 /* default nxacts */ -int nclients = 1; /* default number of simulated clients */ int nxacts = 0; /* number of transactions per client */ int duration = 0; /* duration in seconds */ @@ -102,8 +136,6 @@ FILE *LOGFILE = NULL; bool use_log; /* log transaction latencies to a file */ -int remains; /* number of remaining clients */ - int is_connect; /* establish connection for each transaction */ char *pghost = ""; @@ -138,14 +170,33 @@ typedef struct int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ - struct timeval until; /* napping until */ + int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; - struct timeval txn_begin; /* used for measuring latencies */ + instr_time txn_begin; /* used for measuring latencies */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; +/* + * Thread state and result + */ +typedef struct +{ + pthread_t thread; /* thread handle */ + CState *state; /* array of CState */ + int nstate; /* length of state */ + instr_time start_time; /* thread start time */ +} TState; + +#define INVALID_THREAD ((pthread_t) 0) + +typedef struct +{ + instr_time conn_time; + int xacts; +} TResult; + /* * queries read from files */ @@ -171,8 +222,9 @@ typedef struct char *argv[MAX_ARGS]; /* command list */ } Command; -Command **sql_files[MAX_FILES]; /* SQL script files */ -int num_files; /* number of script files */ +static Command **sql_files[MAX_FILES]; /* SQL script files */ +static int num_files; /* number of script files */ +static int debug = 0; /* debug flag */ /* default scenario */ static char *tpc_b = { @@ -215,44 +267,9 @@ static char *select_only = { "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" }; -/* Connection overhead time */ -static struct timeval conn_total_time = {0, 0}; - /* Function prototypes */ static void setalarm(int seconds); - - -/* Calculate total time */ -static void -addTime(struct timeval * t1, struct timeval * t2, struct timeval * result) -{ - int sec = t1->tv_sec + t2->tv_sec; - int usec = t1->tv_usec + t2->tv_usec; - - if (usec >= 1000000) - { - usec -= 1000000; - sec++; - } - result->tv_sec = sec; - result->tv_usec = usec; -} - -/* Calculate time difference */ -static void -diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result) -{ - int sec = t1->tv_sec - t2->tv_sec; - int usec = t1->tv_usec - t2->tv_usec; - - if (usec < 0) - { - usec += 1000000; - sec--; - } - result->tv_sec = sec; - result->tv_usec = usec; -} +static void* threadRun(void *arg); static void usage(const char *progname) @@ -270,6 +287,7 @@ usage(const char *progname) " -D VARNAME=VALUE\n" " define variable for use by custom script\n" " -f FILENAME read transaction script from FILENAME\n" + " -j NUM number of threads (default: 1)\n" " -l write transaction times to log file\n" " -M {simple|extended|prepared}\n" " protocol for submitting queries to server (default: simple)\n" @@ -379,29 +397,6 @@ discard_response(CState *state) } while (res); } -/* check to see if the SQL result was good */ -static int -check(CState *state, PGresult *res, int n) -{ - CState *st = &state[n]; - - switch (PQresultStatus(res)) - { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - /* OK */ - break; - default: - fprintf(stderr, "Client %d aborted in state %d: %s", - n, st->state, PQerrorMessage(st->con)); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return (-1); - } - return (0); /* OK */ -} - static int compareVariables(const void *v1, const void *v2) { @@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state) sprintf(buffer, "P%d_%d", file, state); } -static void -doCustom(CState *state, int n, int debug) +static bool +clientDone(CState *st, bool ok) +{ + (void) ok; /* unused */ + + if (st->con != NULL) + { + PQfinish(st->con); + st->con = NULL; + } + return false; /* always false */ +} + +/* return false iff client should be disconnected */ +static bool +doCustom(CState *st, instr_time *conn_time) { PGresult *res; - CState *st = &state[n]; Command **commands; top: @@ -610,16 +618,13 @@ top: if (st->sleeping) { /* are we sleeping? */ - int usec; - struct timeval now; + instr_time now; - gettimeofday(&now, NULL); - usec = (st->until.tv_sec - now.tv_sec) * 1000000 + - st->until.tv_usec - now.tv_usec; - if (usec <= 0) + INSTR_TIME_SET_CURRENT(now); + if (st->until <= INSTR_TIME_GET_MICROSEC(now)) st->sleeping = 0; /* Done sleeping, go ahead with next command */ else - return; /* Still sleeping, nothing to do here */ + return true; /* Still sleeping, nothing to do here */ } if (st->listen) @@ -627,17 +632,14 @@ top: if (commands[st->state]->type == SQL_COMMAND) { if (debug) - fprintf(stderr, "client %d receiving\n", n); + fprintf(stderr, "client %d receiving\n", st->id); if (!PQconsumeInput(st->con)) { /* there's something wrong */ - fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; + fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state); + return clientDone(st, false); } if (PQisBusy(st->con)) - return; /* don't have the whole result yet */ + return true; /* don't have the whole result yet */ } /* @@ -645,25 +647,35 @@ top: */ if (use_log && commands[st->state + 1] == NULL) { - double diff; - struct timeval now; + instr_time diff; + double sec; + double msec; + double usec; - gettimeofday(&now, NULL); - diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 + - (int) (now.tv_usec - st->txn_begin.tv_usec); + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, st->txn_begin); + sec = INSTR_TIME_GET_DOUBLE(diff); + msec = INSTR_TIME_GET_MILLISEC(diff); + usec = (double) INSTR_TIME_GET_MICROSEC(diff); - fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", - st->id, st->cnt, diff, st->use_file, - (long) now.tv_sec, (long) now.tv_usec); + fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n", + st->id, st->cnt, usec, st->use_file, + sec, usec - sec * 1000.0); } if (commands[st->state]->type == SQL_COMMAND) { res = PQgetResult(st->con); - if (check(state, res, n)) + switch (PQresultStatus(res)) { - PQclear(res); - return; + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + break; /* OK */ + default: + fprintf(stderr, "Client %d aborted in state %d: %s", + st->id, st->state, PQerrorMessage(st->con)); + PQclear(res); + return clientDone(st, false); } PQclear(res); discard_response(st); @@ -679,15 +691,7 @@ top: ++st->cnt; if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) - { - remains--; /* I've done */ - if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } - return; - } + return clientDone(st, true); /* exit success */ } /* increment state counter */ @@ -702,27 +706,20 @@ top: if (st->con == NULL) { - struct timeval t1, - t2, - t3; + instr_time start, end; - gettimeofday(&t1, NULL); + INSTR_TIME_SET_CURRENT(start); if ((st->con = doConnect()) == NULL) { - fprintf(stderr, "Client %d aborted in establishing connection.\n", - n); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - return; + fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id); + return clientDone(st, false); } - gettimeofday(&t2, NULL); - diffTime(&t2, &t1, &t3); - addTime(&conn_total_time, &t3, &conn_total_time); + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } if (use_log && st->state == 0) - gettimeofday(&(st->txn_begin), NULL); + INSTR_TIME_SET_CURRENT(st->txn_begin); if (commands[st->state]->type == SQL_COMMAND) { @@ -738,11 +735,11 @@ top: { fprintf(stderr, "out of memory\n"); st->ecnt++; - return; + return true; } if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); + fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQuery(st->con, sql); free(sql); } @@ -754,7 +751,7 @@ top: getQueryParams(st, command, params); if (debug) - fprintf(stderr, "client %d sending %s\n", n, sql); + fprintf(stderr, "client %d sending %s\n", st->id, sql); r = PQsendQueryParams(st->con, sql, command->argc - 1, NULL, params, NULL, NULL, 0); } @@ -788,7 +785,7 @@ top: preparedStatementName(name, st->use_file, st->state); if (debug) - fprintf(stderr, "client %d sending %s\n", n, name); + fprintf(stderr, "client %d sending %s\n", st->id, name); r = PQsendQueryPrepared(st->con, name, command->argc - 1, params, NULL, NULL, 0); } @@ -798,7 +795,7 @@ top: if (r == 0) { if (debug) - fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]); + fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]); st->ecnt++; } else @@ -812,7 +809,7 @@ top: if (debug) { - fprintf(stderr, "client %d executing \\%s", n, argv[0]); + fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); for (i = 1; i < argc; i++) fprintf(stderr, " %s", argv[i]); fprintf(stderr, "\n"); @@ -831,7 +828,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; - return; + return true; } min = atoi(var); } @@ -853,7 +850,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); st->ecnt++; - return; + return true; } max = atoi(var); } @@ -864,7 +861,7 @@ top: { fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); st->ecnt++; - return; + return true; } #ifdef DEBUG @@ -876,7 +873,7 @@ top: { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; - return; + return true; } st->listen = 1; @@ -894,7 +891,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); st->ecnt++; - return; + return true; } ope1 = atoi(var); } @@ -911,7 +908,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); st->ecnt++; - return; + return true; } ope2 = atoi(var); } @@ -930,7 +927,7 @@ top: { fprintf(stderr, "%s: division by zero\n", argv[0]); st->ecnt++; - return; + return true; } snprintf(res, sizeof(res), "%d", ope1 / ope2); } @@ -938,7 +935,7 @@ top: { fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); st->ecnt++; - return; + return true; } } @@ -946,7 +943,7 @@ top: { fprintf(stderr, "%s: out of memory\n", argv[0]); st->ecnt++; - return; + return true; } st->listen = 1; @@ -955,7 +952,7 @@ top: { char *var; int usec; - struct timeval now; + instr_time now; if (*argv[1] == ':') { @@ -963,7 +960,7 @@ top: { fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); st->ecnt++; - return; + return true; } usec = atoi(var); } @@ -980,9 +977,8 @@ top: else usec *= 1000000; - gettimeofday(&now, NULL); - st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000; - st->until.tv_usec = (now.tv_usec + usec) % 1000000; + INSTR_TIME_SET_CURRENT(now); + st->until = INSTR_TIME_GET_MICROSEC(now) + usec; st->sleeping = 1; st->listen = 1; @@ -990,18 +986,23 @@ top: goto top; } + + return true; } /* discard connections */ static void -disconnect_all(CState *state) +disconnect_all(CState *state, int length) { int i; - for (i = 0; i < nclients; i++) + for (i = 0; i < length; i++) { if (state[i].con) + { PQfinish(state[i].con); + state[i].con = NULL; + } } } @@ -1267,6 +1268,24 @@ process_commands(char *buf) return NULL; } + /* + * Split argument into number and unit for "sleep 1ms" or so. + * We don't have to terminate the number argument with null + * because it will parsed with atoi, that ignores trailing + * non-digit characters. + */ + if (my_commands->argv[1][0] != ':') + { + char *c = my_commands->argv[1]; + while (isdigit(*c)) { c++; } + if (*c) + { + my_commands->argv[2] = c; + if (my_commands->argc < 3) + my_commands->argc = 3; + } + } + if (my_commands->argc >= 3) { if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && @@ -1453,25 +1472,18 @@ process_builtin(char *tb) /* print out results */ static void -printResults( - int ttype, CState *state, - struct timeval * start_time, struct timeval * end_time) +printResults(int ttype, int normal_xacts, int nclients, int nthreads, + instr_time total_time, instr_time conn_total_time) { - double t1, - t2; - int i; - int normal_xacts = 0; + double time_include, + tps_include, + tps_exclude; char *s; - for (i = 0; i < nclients; i++) - normal_xacts += state[i].cnt; - - t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec); - t1 = normal_xacts * 1000000.0 / t1; - - t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 + - (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec); - t2 = normal_xacts * 1000000.0 / t2; + time_include = INSTR_TIME_GET_DOUBLE(total_time); + tps_include = normal_xacts / time_include; + tps_exclude = normal_xacts / (time_include - + (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); if (ttype == 0) s = "TPC-B (sort of)"; @@ -1486,6 +1498,7 @@ printResults( printf("scaling factor: %d\n", scale); printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); + printf("number of threads: %d\n", nthreads); if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); @@ -1498,8 +1511,8 @@ printResults( printf("number of transactions actually processed: %d\n", normal_xacts); } - printf("tps = %f (including connections establishing)\n", t1); - printf("tps = %f (excluding connections establishing)\n", t2); + printf("tps = %f (including connections establishing)\n", tps_include); + printf("tps = %f (excluding connections establishing)\n", tps_exclude); } @@ -1507,29 +1520,26 @@ int main(int argc, char **argv) { int c; + int nclients = 1; /* default number of simulated clients */ + int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ - int debug = 0; /* debug flag */ int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, * 2: skip update of branches and tellers */ char *filename = NULL; bool scale_given = false; CState *state; /* status of clients */ + TState *threads; /* array of thread */ - struct timeval start_time; /* start up time */ - struct timeval end_time; /* end time */ + instr_time start_time; /* start up time */ + instr_time total_time; + instr_time conn_total_time; + int total_xacts; int i; - fd_set input_mask; - int nsocks; /* return from select(2) */ - int maxsock; /* max socket number to be waited */ - struct timeval now; - struct timeval timeout; - int min_usec; - #ifdef HAVE_GETRLIMIT struct rlimit rlim; #endif @@ -1579,7 +1589,7 @@ main(int argc, char **argv) memset(state, 0, sizeof(*state)); - while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1) + while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1) { switch (c) { @@ -1632,6 +1642,14 @@ main(int argc, char **argv) } #endif /* HAVE_GETRLIMIT */ break; + case 'j': /* jobs */ + nthreads = atoi(optarg); + if (nthreads <= 0) + { + fprintf(stderr, "invalid number of threads: %d\n", nthreads); + exit(1); + } + break; case 'C': is_connect = 1; break; @@ -1752,7 +1770,11 @@ main(int argc, char **argv) if (nxacts <= 0 && duration <= 0) nxacts = DEFAULT_NXACTS; - remains = nclients; + if (nclients % nthreads != 0) + { + fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads); + exit(1); + } if (nclients > 1) { @@ -1770,6 +1792,7 @@ main(int argc, char **argv) { int j; + state[i].id = i; for (j = 0; j < state[0].nvariables; j++) { if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) @@ -1879,33 +1902,8 @@ main(int argc, char **argv) PQfinish(con); /* set random seed */ - gettimeofday(&start_time, NULL); - srandom((unsigned int) start_time.tv_usec); - - /* get start up time */ - gettimeofday(&start_time, NULL); - - /* set alarm if duration is specified. */ - if (duration > 0) - setalarm(duration); - - if (is_connect == 0) - { - struct timeval t, - now; - - /* make connections to the database */ - for (i = 0; i < nclients; i++) - { - state[i].id = i; - if ((state[i].con = doConnect()) == NULL) - exit(1); - } - /* time after connections set up */ - gettimeofday(&now, NULL); - diffTime(&now, &start_time, &t); - addTime(&conn_total_time, &t, &conn_total_time); - } + INSTR_TIME_SET_CURRENT(start_time); + srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)); /* process bultin SQL scripts */ switch (ttype) @@ -1929,140 +1927,227 @@ main(int argc, char **argv) break; } - /* send start up queries in async manner */ - for (i = 0; i < nclients; i++) + /* get start up time */ + INSTR_TIME_SET_CURRENT(start_time); + + /* set alarm if duration is specified. */ + if (duration > 0) + setalarm(duration); + + /* start threads */ + threads = (TState *) malloc(sizeof(TState) * nthreads); + for (i = 0; i < nthreads; i++) { - Command **commands = sql_files[state[i].use_file]; - int prev_ecnt = state[i].ecnt; + threads[i].state = &state[nclients / nthreads * i]; + threads[i].nstate = nclients / nthreads; + INSTR_TIME_SET_CURRENT(threads[i].start_time); - state[i].use_file = getrand(0, num_files - 1); - doCustom(state, i, debug); - - if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + /* the first thread (i = 0) is executed by main thread */ + if (i > 0) { - fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state); - remains--; /* I've aborted */ - PQfinish(state[i].con); - state[i].con = NULL; + int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); + if (err != 0 || threads[i].thread == INVALID_THREAD) + { + fprintf(stderr, "cannot create thread: %s\n", strerror(err)); + exit(1); + } + } + else + { + threads[i].thread = INVALID_THREAD; } } - for (;;) + /* wait for threads and accumulate results */ + total_xacts = 0; + INSTR_TIME_SET_ZERO(conn_total_time); + for (i = 0; i < nthreads; i++) { - if (remains <= 0) - { /* all done ? */ - disconnect_all(state); - /* get end time */ - gettimeofday(&end_time, NULL); - printResults(ttype, state, &start_time, &end_time); - if (LOGFILE) - fclose(LOGFILE); - exit(0); + void *ret = NULL; + + if (threads[i].thread == INVALID_THREAD) + ret = threadRun(&threads[i]); + else + pthread_join(threads[i].thread, &ret); + + if (ret != NULL) + { + TResult *r = (TResult *) ret; + total_xacts += r->xacts; + INSTR_TIME_ADD(conn_total_time, r->conn_time); + free(ret); } + } + disconnect_all(state, nclients); + + /* get end time */ + INSTR_TIME_SET_CURRENT(total_time); + INSTR_TIME_SUBTRACT(total_time, start_time); + printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time); + if (LOGFILE) + fclose(LOGFILE); + + return 0; +} + +static void * +threadRun(void *arg) +{ + TState *thread = (TState *) arg; + CState *state = thread->state; + TResult *result; + instr_time start, end; + int nstate = thread->nstate; + int remains = nstate; /* number of remaining clients */ + int i; + + result = malloc(sizeof(TResult)); + INSTR_TIME_SET_ZERO(result->conn_time); + + if (is_connect == 0) + { + /* make connections to the database */ + for (i = 0; i < nstate; i++) + { + if ((state[i].con = doConnect()) == NULL) + goto done; + } + } + + /* time after thread and connections set up */ + INSTR_TIME_SET_CURRENT(result->conn_time); + INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + + /* send start up queries in async manner */ + for (i = 0; i < nstate; i++) + { + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int prev_ecnt = st->ecnt; + + st->use_file = getrand(0, num_files - 1); + if (!doCustom(st, &result->conn_time)) + remains--; /* I've aborted */ + + if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) + { + fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state); + remains--; /* I've aborted */ + PQfinish(st->con); + st->con = NULL; + } + } + + while (remains > 0) + { + fd_set input_mask; + int maxsock; /* max socket number to be waited */ + int64 now_usec = 0; + int64 min_usec; FD_ZERO(&input_mask); maxsock = -1; - min_usec = -1; - for (i = 0; i < nclients; i++) + min_usec = INT64_MAX; + for (i = 0; i < nstate; i++) { - Command **commands = sql_files[state[i].use_file]; + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int sock; - if (state[i].sleeping) + if (st->sleeping) { int this_usec; - int sock = PQsocket(state[i].con); - if (min_usec < 0) + if (min_usec == INT64_MAX) { - gettimeofday(&now, NULL); - min_usec = 0; + instr_time now; + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); } - this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 + - state[i].until.tv_usec - now.tv_usec; - - if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec)) + this_usec = st->until - now_usec; + if (min_usec > this_usec) min_usec = this_usec; - - FD_SET (sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; } - else if (state[i].con && commands[state[i].state]->type != META_COMMAND) + else if (st->con == NULL) { - int sock = PQsocket(state[i].con); - - if (sock < 0) - { - disconnect_all(state); - exit(1); - } - FD_SET (sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; + continue; } + else if (commands[st->state]->type == META_COMMAND) + { + min_usec = 0; /* the connection is ready to run */ + break; + } + + sock = PQsocket(st->con); + if (sock < 0) + { + fprintf(stderr, "bad socket: %s\n", strerror(errno)); + goto done; + } + + FD_SET(sock, &input_mask); + if (maxsock < sock) + maxsock = sock; } - if (maxsock != -1) + if (min_usec > 0 && maxsock != -1) { - if (min_usec >= 0) + int nsocks; /* return from select(2) */ + + if (min_usec != INT64_MAX) { + struct timeval timeout; timeout.tv_sec = min_usec / 1000000; timeout.tv_usec = min_usec % 1000000; - - nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, - (fd_set *) NULL, &timeout); + nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); } else - nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL, - (fd_set *) NULL, (struct timeval *) NULL); + nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); if (nsocks < 0) { if (errno == EINTR) continue; /* must be something wrong */ - disconnect_all(state); fprintf(stderr, "select failed: %s\n", strerror(errno)); - exit(1); + goto done; } -#ifdef NOT_USED - else if (nsocks == 0) - { /* timeout */ - fprintf(stderr, "select timeout\n"); - for (i = 0; i < nclients; i++) - { - fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n", - i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); - } - exit(0); - } -#endif } /* ok, backend returns reply */ - for (i = 0; i < nclients; i++) + for (i = 0; i < nstate; i++) { - Command **commands = sql_files[state[i].use_file]; - int prev_ecnt = state[i].ecnt; + CState *st = &state[i]; + Command **commands = sql_files[st->use_file]; + int prev_ecnt = st->ecnt; - if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask) - || commands[state[i].state]->type == META_COMMAND)) + if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) + || commands[st->state]->type == META_COMMAND)) { - doCustom(state, i, debug); + if (!doCustom(st, &result->conn_time)) + remains--; /* I've aborted */ } - if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND) + if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { - fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state); + fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state); remains--; /* I've aborted */ - PQfinish(state[i].con); - state[i].con = NULL; + PQfinish(st->con); + st->con = NULL; } } } + +done: + INSTR_TIME_SET_CURRENT(start); + disconnect_all(state, nstate); + result->xacts = 0; + for (i = 0; i < nstate; i++) + result->xacts += state[i].cnt; + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); + return result; } @@ -2084,6 +2169,87 @@ setalarm(int seconds) pqsignal(SIGALRM, handle_sig_alarm); alarm(seconds); } + +#ifndef ENABLE_THREAD_SAFETY + +/* + * implements pthread using fork. + */ + +typedef struct fork_pthread +{ + pid_t pid; + int pipes[2]; +} fork_pthread; + +static int +pthread_create(pthread_t *thread, + pthread_attr_t *attr, + void * (*start_routine)(void *), + void *arg) +{ + fork_pthread *th; + void *ret; + + th = (fork_pthread *) malloc(sizeof(fork_pthread)); + pipe(th->pipes); + + th->pid = fork(); + if (th->pid == -1) /* error */ + { + free(th); + return errno; + } + if (th->pid != 0) /* parent process */ + { + close(th->pipes[1]); + *thread = th; + return 0; + } + + /* child process */ + close(th->pipes[0]); + + /* set alarm again because the child does not inherit timers */ + if (duration > 0) + setalarm(duration); + + ret = start_routine(arg); + write(th->pipes[1], ret, sizeof(TResult)); + close(th->pipes[1]); + free(th); + exit(0); +} + +static int +pthread_join(pthread_t th, void **thread_return) +{ + int status; + + while (waitpid(th->pid, &status, 0) != th->pid) + { + if (errno != EINTR) + return errno; + } + + if (thread_return != NULL) + { + /* assume result is TResult */ + *thread_return = malloc(sizeof(TResult)); + if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) + { + free(*thread_return); + *thread_return = NULL; + } + } + close(th->pipes[0]); + + free(th); + return 0; +} + +#endif + #else /* WIN32 */ static VOID CALLBACK @@ -2110,4 +2276,70 @@ setalarm(int seconds) } } +/* partial pthread implementation for Windows */ + +typedef struct win32_pthread +{ + HANDLE handle; + void *(*routine)(void *); + void *arg; + void *result; +} win32_pthread; + +static unsigned __stdcall +win32_pthread_run(void *arg) +{ + win32_pthread *th = (win32_pthread *) arg; + + th->result = th->routine(th->arg); + + return 0; +} + +static int +pthread_create(pthread_t *thread, + pthread_attr_t *attr, + void * (*start_routine)(void *), + void *arg) +{ + int save_errno; + win32_pthread *th; + + th = (win32_pthread *) malloc(sizeof(win32_pthread)); + th->routine = start_routine; + th->arg = arg; + th->result = NULL; + + th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL); + if (th->handle == NULL) + { + save_errno = errno; + free(th); + return save_errno; + } + + *thread = th; + return 0; +} + +static int +pthread_join(pthread_t th, void **thread_return) +{ + if (th == NULL || th->handle == NULL) + return errno = EINVAL; + + if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + return errno; + } + + if (thread_return) + *thread_return = th->result; + + CloseHandle(th->handle); + free(th); + return 0; +} + #endif /* WIN32 */ diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml index 5c30e8499f..c34f7acbbb 100644 --- a/doc/src/sgml/pgbench.sgml +++ b/doc/src/sgml/pgbench.sgml @@ -1,4 +1,4 @@ - + pgbench @@ -171,6 +171,14 @@ pgbench options dbname sessions. Default is 1. + + -j threads + + Number of worker threads. Clients are equally-divided into those + threads and executed in it. The number of clients must be a multiple + number of threads. Default is 1. + + -t transactions