From 12788ae49e1933f463bc59a6efe46c4a01701b76 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 26 Sep 2016 10:56:02 +0300 Subject: [PATCH] Refactor script execution state machine in pgbench. The doCustom() function had grown into quite a mess. Rewrite it, in a more explicit state machine style, for readability. This also fixes one minor bug: if a script consisted entirely of meta commands, doCustom() never returned to the caller, so progress reports with the -P option were not printed. I don't want to backpatch this refactoring, and the bug is quite insignificant, so only commit this to master, and leave the bug unfixed in back-branches. Review and original bug report by Fabien Coelho. Discussion: --- src/bin/pgbench/pgbench.c | 1176 ++++++++++++++++++++++--------------- 1 file changed, 696 insertions(+), 480 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 8b24ad50e7..1fb4ae46d5 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -235,25 +235,95 @@ typedef struct StatsData } StatsData; /* - * Connection state + * Connection state machine states. + */ +typedef enum +{ + /* + * The client must first choose a script to execute. Once chosen, it can + * either be throttled (state CSTATE_START_THROTTLE under --rate) or start + * right away (state CSTATE_START_TX). + */ + CSTATE_CHOOSE_SCRIPT, + + /* + * In CSTATE_START_THROTTLE state, we calculate when to begin the next + * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state + * sleeps until that moment. (If throttling is not enabled, doCustom() + * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.) + */ + CSTATE_START_THROTTLE, + CSTATE_THROTTLE, + + /* + * CSTATE_START_TX performs start-of-transaction processing. Establishes + * a new connection for the transaction, in --connect mode, and records + * the transaction start time. + */ + CSTATE_START_TX, + + /* + * We loop through these states, to process each command in the script: + * + * CSTATE_START_COMMAND starts the execution of a command. On a SQL + * command, the command is sent to the server, and we move to + * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, + * and we enter the CSTATE_SLEEP state to wait for it to expire. Other + * meta-commands are executed immediately. + * + * CSTATE_WAIT_RESULT waits until we get a result set back from the server + * for the current command. + * + * CSTATE_SLEEP waits until the end of \sleep. + * + * CSTATE_END_COMMAND records the end-of-command timestamp, increments the + * command counter, and loops back to CSTATE_START_COMMAND state. + */ + CSTATE_START_COMMAND, + CSTATE_WAIT_RESULT, + CSTATE_SLEEP, + CSTATE_END_COMMAND, + + /* + * CSTATE_END_TX performs end-of-transaction processing. Calculates + * latency, and logs the transaction. In --connect mode, closes the + * current connection. Chooses the next script to execute and starts over + * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no + * more work to do. + */ + CSTATE_END_TX, + + /* + * Final states. CSTATE_ABORTED means that the script execution was + * aborted because a command failed, CSTATE_FINISHED means success. + */ + CSTATE_ABORTED, + CSTATE_FINISHED +} ConnectionStateEnum; + +/* + * Connection state. */ typedef struct { PGconn *con; /* connection handle to DB */ int id; /* client No. */ - int state; /* state No. */ - bool listen; /* whether an async query has been sent */ - bool sleeping; /* whether the client is napping */ - bool throttling; /* whether nap is for throttling */ - bool is_throttled; /* whether transaction throttling is done */ + ConnectionStateEnum state; /* state machine's current state. */ + + int use_file; /* index in sql_script for this client */ + int command; /* command number in script */ + + /* client variables */ Variable *variables; /* array of variable definitions */ int nvariables; /* number of variables */ bool vars_sorted; /* are variables sorted by name? */ + + /* various times about current transaction */ int64 txn_scheduled; /* scheduled start time of transaction (usec) */ int64 sleep_until; /* scheduled start time of next cmd (usec) */ instr_time txn_begin; /* used for measuring schedule lag times */ instr_time stmt_begin; /* used for measuring statement latencies */ - int use_file; /* index in sql_scripts for this client */ + bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ /* per client collected stats */ @@ -1382,7 +1452,7 @@ evalFunc(TState *thread, CState *st, Assert(nargs == 1); fprintf(stderr, "debug(script=%d,command=%d): ", - st->use_file, st->state + 1); + st->use_file, st->command + 1); if (varg->type == PGBT_INT) fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival); @@ -1733,15 +1803,12 @@ preparedStatementName(char *buffer, int file, int state) sprintf(buffer, "P%d_%d", file, state); } -static bool -clientDone(CState *st) +static void +commandFailed(CState *st, char *message) { - if (st->con != NULL) - { - PQfinish(st->con); - st->con = NULL; - } - return false; /* always false */ + fprintf(stderr, + "client %d aborted in command %d of script %d; %s\n", + st->id, st->command, st->use_file, message); } /* return a script number with a weighted choice. */ @@ -1763,425 +1830,595 @@ chooseScript(TState *thread) return i - 1; } -/* return false iff client should be disconnected */ +/* Send a SQL command, using the chosen querymode */ static bool +sendCommand(CState *st, Command *command) +{ + int r; + + if (querymode == QUERY_SIMPLE) + { + char *sql; + + sql = pg_strdup(command->argv[0]); + sql = assignVariables(st, sql); + + if (debug) + fprintf(stderr, "client %d sending %s\n", st->id, sql); + r = PQsendQuery(st->con, sql); + free(sql); + } + else if (querymode == QUERY_EXTENDED) + { + const char *sql = command->argv[0]; + const char *params[MAX_ARGS]; + + getQueryParams(st, command, params); + + if (debug) + fprintf(stderr, "client %d sending %s\n", st->id, sql); + r = PQsendQueryParams(st->con, sql, command->argc - 1, + NULL, params, NULL, NULL, 0); + } + else if (querymode == QUERY_PREPARED) + { + char name[MAX_PREPARE_NAME]; + const char *params[MAX_ARGS]; + + if (!st->prepared[st->use_file]) + { + int j; + Command **commands = sql_script[st->use_file].commands; + + for (j = 0; commands[j] != NULL; j++) + { + PGresult *res; + char name[MAX_PREPARE_NAME]; + + if (commands[j]->type != SQL_COMMAND) + continue; + preparedStatementName(name, st->use_file, j); + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "%s", PQerrorMessage(st->con)); + PQclear(res); + } + st->prepared[st->use_file] = true; + } + + getQueryParams(st, command, params); + preparedStatementName(name, st->use_file, st->command); + + if (debug) + fprintf(stderr, "client %d sending %s\n", st->id, name); + r = PQsendQueryPrepared(st->con, name, command->argc - 1, + params, NULL, NULL, 0); + } + else /* unknown sql mode */ + r = 0; + + if (r == 0) + { + if (debug) + fprintf(stderr, "client %d could not send %s\n", + st->id, command->argv[0]); + st->ecnt++; + return false; + } + else + return true; +} + +/* + * Parse the argument to a \sleep command, and return the requested amount + * of delay, in microseconds. Returns true on success, false on error. + */ +static bool +evaluateSleep(CState *st, int argc, char **argv, int *usecs) +{ + char *var; + int usec; + + if (*argv[1] == ':') + { + if ((var = getVariable(st, argv[1] + 1)) == NULL) + { + fprintf(stderr, "%s: undefined variable \"%s\"\n", + argv[0], argv[1]); + return false; + } + usec = atoi(var); + } + else + usec = atoi(argv[1]); + + if (argc > 2) + { + if (pg_strcasecmp(argv[2], "ms") == 0) + usec *= 1000; + else if (pg_strcasecmp(argv[2], "s") == 0) + usec *= 1000000; + } + else + usec *= 1000000; + + *usecs = usec; + return true; +} + +/* + * Advance the state machine of a connection, if possible. + */ +static void doCustom(TState *thread, CState *st, StatsData *agg) { PGresult *res; - Command **commands; - bool trans_needs_throttle = false; + Command *command; instr_time now; + bool end_tx_processed = false; + int64 wait; /* * gettimeofday() isn't free, so we get the current timestamp lazily the * first time it's needed, and reuse the same value throughout this - * function after that. This also ensures that e.g. the calculated latency - * reported in the log file and in the totals are the same. Zero means - * "not set yet". Reset "now" when we step to the next command with "goto - * top", though. + * function after that. This also ensures that e.g. the calculated + * latency reported in the log file and in the totals are the same. Zero + * means "not set yet". Reset "now" when we execute shell commands or + * expressions, which might take a non-negligible amount of time, though. */ -top: INSTR_TIME_SET_ZERO(now); - commands = sql_script[st->use_file].commands; - /* - * Handle throttling once per transaction by sleeping. It is simpler to - * do this here rather than at the end, because so much complicated logic - * happens below when statements finish. + * Loop in the state machine, until we have to wait for a result from the + * server (or have to sleep, for throttling or for \sleep). + * + * Note: In the switch-statement below, 'break' will loop back here, + * meaning "continue in the state machine". Return is used to return to + * the caller. */ - if (throttle_delay && !st->is_throttled) + for (;;) { - /* - * Generate a delay such that the series of delays will approximate a - * Poisson distribution centered on the throttle_delay time. - * - * If transactions are too slow or a given wait is shorter than a - * transaction, the next transaction will start right away. - */ - int64 wait = getPoissonRand(thread, throttle_delay); - - thread->throttle_trigger += wait; - st->txn_scheduled = thread->throttle_trigger; - - /* stop client if next transaction is beyond pgbench end of execution */ - if (duration > 0 && st->txn_scheduled > end_time) - return clientDone(st); - - /* - * If this --latency-limit is used, and this slot is already late so - * that the transaction will miss the latency limit even if it - * completed immediately, we skip this time slot and iterate till the - * next slot that isn't late yet. - */ - if (latency_limit) + switch (st->state) { - int64 now_us; + /* + * Select transaction to run. + */ + case CSTATE_CHOOSE_SCRIPT: - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - now_us = INSTR_TIME_GET_MICROSEC(now); - while (thread->throttle_trigger < now_us - latency_limit) - { - processXactStats(thread, st, &now, true, agg); - /* next rendez-vous */ + st->use_file = chooseScript(thread); + + if (debug) + fprintf(stderr, "client %d executing script \"%s\"\n", st->id, + sql_script[st->use_file].desc); + + if (throttle_delay > 0) + st->state = CSTATE_START_THROTTLE; + else + st->state = CSTATE_START_TX; + break; + + /* + * Handle throttling once per transaction by sleeping. + */ + case CSTATE_START_THROTTLE: + + /* + * Generate a delay such that the series of delays will + * approximate a Poisson distribution centered on the + * throttle_delay time. + * + * If transactions are too slow or a given wait is shorter + * than a transaction, the next transaction will start right + * away. + */ + Assert(throttle_delay > 0); wait = getPoissonRand(thread, throttle_delay); + thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; - } - } - st->sleep_until = st->txn_scheduled; - st->sleeping = true; - st->throttling = true; - st->is_throttled = true; - if (debug) - fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n", - st->id, wait); - } - - if (st->sleeping) - { /* are we sleeping? */ - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until) - return true; /* Still sleeping, nothing to do here */ - /* Else done sleeping, go ahead with next command */ - st->sleeping = false; - st->throttling = false; - } - - if (st->listen) - { /* are we receiver? */ - if (commands[st->state]->type == SQL_COMMAND) - { - if (debug) - fprintf(stderr, "client %d receiving\n", st->id); - if (!PQconsumeInput(st->con)) - { /* there's something wrong */ - fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state); - return clientDone(st); - } - if (PQisBusy(st->con)) - return true; /* don't have the whole result yet */ - } - - /* - * command finished: accumulate per-command execution times in - * thread-local data structure, if per-command latencies are requested - */ - if (is_latencies) - { - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - - /* XXX could use a mutex here, but we choose not to */ - addToSimpleStats(&commands[st->state]->stats, - INSTR_TIME_GET_DOUBLE(now) - - INSTR_TIME_GET_DOUBLE(st->stmt_begin)); - } - - /* transaction finished: calculate latency and log the transaction */ - if (commands[st->state + 1] == NULL) - { - if (progress || throttle_delay || latency_limit || - per_script_stats || use_log) - processXactStats(thread, st, &now, false, agg); - else - thread->stats.cnt++; - } - - if (commands[st->state]->type == SQL_COMMAND) - { - /* - * Read and discard the query result; note this is not included in - * the statement latency numbers. - */ - res = PQgetResult(st->con); - switch (PQresultStatus(res)) - { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - case PGRES_EMPTY_QUERY: - break; /* OK */ - default: - fprintf(stderr, "client %d aborted in state %d: %s", - st->id, st->state, PQerrorMessage(st->con)); - PQclear(res); - return clientDone(st); - } - PQclear(res); - discard_response(st); - } - - if (commands[st->state + 1] == NULL) - { - if (is_connect) - { - PQfinish(st->con); - st->con = NULL; - } - - ++st->cnt; - if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) - return clientDone(st); /* exit success */ - } - - /* increment state counter */ - st->state++; - if (commands[st->state] == NULL) - { - st->state = 0; - st->use_file = chooseScript(thread); - commands = sql_script[st->use_file].commands; - if (debug) - fprintf(stderr, "client %d executing script \"%s\"\n", st->id, - sql_script[st->use_file].desc); - st->is_throttled = false; - - /* - * No transaction is underway anymore, which means there is - * nothing to listen to right now. When throttling rate limits - * are active, a sleep will happen next, as the next transaction - * starts. And then in any case the next SQL command will set - * listen back to true. - */ - st->listen = false; - trans_needs_throttle = (throttle_delay > 0); - } - } - - if (st->con == NULL) - { - instr_time start, - end; - - INSTR_TIME_SET_CURRENT(start); - if ((st->con = doConnect()) == NULL) - { - fprintf(stderr, "client %d aborted while establishing connection\n", - st->id); - return clientDone(st); - } - INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); - - /* Reset session-local state */ - st->listen = false; - st->sleeping = false; - st->throttling = false; - st->is_throttled = false; - memset(st->prepared, 0, sizeof(st->prepared)); - } - - /* - * This ensures that a throttling delay is inserted before proceeding with - * sql commands, after the first transaction. The first transaction - * throttling is performed when first entering doCustom. - */ - if (trans_needs_throttle) - { - trans_needs_throttle = false; - goto top; - } - - /* Record transaction start time under logging, progress or throttling */ - if ((use_log || progress || throttle_delay || latency_limit || - per_script_stats) && st->state == 0) - { - INSTR_TIME_SET_CURRENT(st->txn_begin); - - /* - * When not throttling, this is also the transaction's scheduled start - * time. - */ - if (!throttle_delay) - st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin); - } - - /* Record statement start time if per-command latencies are requested */ - if (is_latencies) - INSTR_TIME_SET_CURRENT(st->stmt_begin); - - if (commands[st->state]->type == SQL_COMMAND) - { - const Command *command = commands[st->state]; - int r; - - if (querymode == QUERY_SIMPLE) - { - char *sql; - - sql = pg_strdup(command->argv[0]); - sql = assignVariables(st, sql); - - if (debug) - fprintf(stderr, "client %d sending %s\n", st->id, sql); - r = PQsendQuery(st->con, sql); - free(sql); - } - else if (querymode == QUERY_EXTENDED) - { - const char *sql = command->argv[0]; - const char *params[MAX_ARGS]; - - getQueryParams(st, command, params); - - if (debug) - fprintf(stderr, "client %d sending %s\n", st->id, sql); - r = PQsendQueryParams(st->con, sql, command->argc - 1, - NULL, params, NULL, NULL, 0); - } - else if (querymode == QUERY_PREPARED) - { - char name[MAX_PREPARE_NAME]; - const char *params[MAX_ARGS]; - - if (!st->prepared[st->use_file]) - { - int j; - - for (j = 0; commands[j] != NULL; j++) + /* + * stop client if next transaction is beyond pgbench end of + * execution + */ + if (duration > 0 && st->txn_scheduled > end_time) { - PGresult *res; - char name[MAX_PREPARE_NAME]; - - if (commands[j]->type != SQL_COMMAND) - continue; - preparedStatementName(name, st->use_file, j); - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - fprintf(stderr, "%s", PQerrorMessage(st->con)); - PQclear(res); + st->state = CSTATE_FINISHED; + break; } - st->prepared[st->use_file] = true; - } - getQueryParams(st, command, params); - preparedStatementName(name, st->use_file, st->state); - - if (debug) - fprintf(stderr, "client %d sending %s\n", st->id, name); - r = PQsendQueryPrepared(st->con, name, command->argc - 1, - params, NULL, NULL, 0); - } - else /* unknown sql mode */ - r = 0; - - if (r == 0) - { - if (debug) - fprintf(stderr, "client %d could not send %s\n", - st->id, command->argv[0]); - st->ecnt++; - } - else - st->listen = true; /* flags that should be listened */ - } - else if (commands[st->state]->type == META_COMMAND) - { - int argc = commands[st->state]->argc, - i; - char **argv = commands[st->state]->argv; - - if (debug) - { - fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); - for (i = 1; i < argc; i++) - fprintf(stderr, " %s", argv[i]); - fprintf(stderr, "\n"); - } - - if (pg_strcasecmp(argv[0], "set") == 0) - { - PgBenchExpr *expr = commands[st->state]->expr; - PgBenchValue result; - - if (!evaluateExpr(thread, st, expr, &result)) - { - st->ecnt++; - return true; - } - - if (!putVariableNumber(st, argv[0], argv[1], &result)) - { - st->ecnt++; - return true; - } - - st->listen = true; - } - else if (pg_strcasecmp(argv[0], "sleep") == 0) - { - char *var; - int usec; - instr_time now; - - if (*argv[1] == ':') - { - if ((var = getVariable(st, argv[1] + 1)) == NULL) + /* + * If this --latency-limit is used, and this slot is already + * late so that the transaction will miss the latency limit + * even if it completed immediately, we skip this time slot + * and iterate till the next slot that isn't late yet. + */ + if (latency_limit) { - fprintf(stderr, "%s: undefined variable \"%s\"\n", - argv[0], argv[1]); - st->ecnt++; - return true; + int64 now_us; + + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + now_us = INSTR_TIME_GET_MICROSEC(now); + while (thread->throttle_trigger < now_us - latency_limit) + { + processXactStats(thread, st, &now, true, agg); + /* next rendez-vous */ + wait = getPoissonRand(thread, throttle_delay); + thread->throttle_trigger += wait; + st->txn_scheduled = thread->throttle_trigger; + } } - usec = atoi(var); - } - else - usec = atoi(argv[1]); - if (argc > 2) - { - if (pg_strcasecmp(argv[2], "ms") == 0) - usec *= 1000; - else if (pg_strcasecmp(argv[2], "s") == 0) - usec *= 1000000; - } - else - usec *= 1000000; + st->state = CSTATE_THROTTLE; + if (debug) + fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n", + st->id, wait); + break; - INSTR_TIME_SET_CURRENT(now); - st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec; - st->sleeping = true; + /* + * Wait until it's time to start next transaction. + */ + case CSTATE_THROTTLE: + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled) + return; /* Still sleeping, nothing to do here */ - st->listen = true; + /* Else done sleeping, start the transaction */ + st->state = CSTATE_START_TX; + break; + + /* Start new transaction */ + case CSTATE_START_TX: + + /* + * Establish connection on first call, or if is_connect is + * true. + */ + if (st->con == NULL) + { + instr_time start; + + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + start = now; + if ((st->con = doConnect()) == NULL) + { + fprintf(stderr, "client %d aborted while establishing connection\n", + st->id); + st->state = CSTATE_ABORTED; + break; + } + INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start); + + /* Reset session-local state */ + memset(st->prepared, 0, sizeof(st->prepared)); + } + + /* + * Record transaction start time under logging, progress or + * throttling. + */ + if (use_log || progress || throttle_delay || latency_limit || + per_script_stats) + { + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + st->txn_begin = now; + + /* + * When not throttling, this is also the transaction's + * scheduled start time. + */ + if (!throttle_delay) + st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now); + } + + /* Begin with the first command */ + st->command = 0; + st->state = CSTATE_START_COMMAND; + break; + + /* + * Send a command to server (or execute a meta-command) + */ + case CSTATE_START_COMMAND: + command = sql_script[st->use_file].commands[st->command]; + + /* + * If we reached the end of the script, move to end-of-xact + * processing. + */ + if (command == NULL) + { + st->state = CSTATE_END_TX; + break; + } + + /* + * Record statement start time if per-command latencies are + * requested + */ + if (is_latencies) + { + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + st->stmt_begin = now; + } + + if (command->type == SQL_COMMAND) + { + if (!sendCommand(st, command)) + { + /* + * Failed. Stay in CSTATE_START_COMMAND state, to + * retry. ??? What the point or retrying? Should + * rather abort? + */ + return; + } + else + st->state = CSTATE_WAIT_RESULT; + } + else if (command->type == META_COMMAND) + { + int argc = command->argc, + i; + char **argv = command->argv; + + if (debug) + { + fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); + for (i = 1; i < argc; i++) + fprintf(stderr, " %s", argv[i]); + fprintf(stderr, "\n"); + } + + if (pg_strcasecmp(argv[0], "sleep") == 0) + { + /* + * A \sleep doesn't execute anything, we just get the + * delay from the argument, and enter the CSTATE_SLEEP + * state. (The per-command latency will be recorded + * in CSTATE_SLEEP state, not here, after the delay + * has elapsed.) + */ + int usec; + + if (!evaluateSleep(st, argc, argv, &usec)) + { + commandFailed(st, "execution of meta-command 'sleep' failed"); + st->state = CSTATE_ABORTED; + break; + } + + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec; + st->state = CSTATE_SLEEP; + break; + } + else + { + if (pg_strcasecmp(argv[0], "set") == 0) + { + PgBenchExpr *expr = command->expr; + PgBenchValue result; + + if (!evaluateExpr(thread, st, expr, &result)) + { + commandFailed(st, "evaluation of meta-command 'set' failed"); + st->state = CSTATE_ABORTED; + break; + } + + if (!putVariableNumber(st, argv[0], argv[1], &result)) + { + commandFailed(st, "assignment of meta-command 'set' failed"); + st->state = CSTATE_ABORTED; + break; + } + } + else if (pg_strcasecmp(argv[0], "setshell") == 0) + { + bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); + + if (timer_exceeded) /* timeout */ + { + st->state = CSTATE_FINISHED; + break; + } + else if (!ret) /* on error */ + { + commandFailed(st, "execution of meta-command 'setshell' failed"); + st->state = CSTATE_ABORTED; + break; + } + else + { + /* succeeded */ + } + } + else if (pg_strcasecmp(argv[0], "shell") == 0) + { + bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); + + if (timer_exceeded) /* timeout */ + { + st->state = CSTATE_FINISHED; + break; + } + else if (!ret) /* on error */ + { + commandFailed(st, "execution of meta-command 'shell' failed"); + st->state = CSTATE_ABORTED; + break; + } + else + { + /* succeeded */ + } + } + + /* + * executing the expression or shell command might + * take a non-negligible amount of time, so reset + * 'now' + */ + INSTR_TIME_SET_ZERO(now); + + st->state = CSTATE_END_COMMAND; + } + } + break; + + /* + * Wait for the current SQL command to complete + */ + case CSTATE_WAIT_RESULT: + command = sql_script[st->use_file].commands[st->command]; + if (debug) + fprintf(stderr, "client %d receiving\n", st->id); + if (!PQconsumeInput(st->con)) + { /* there's something wrong */ + commandFailed(st, "perhaps the backend died while processing"); + st->state = CSTATE_ABORTED; + break; + } + if (PQisBusy(st->con)) + return; /* don't have the whole result yet */ + + /* + * Read and discard the query result; + */ + res = PQgetResult(st->con); + switch (PQresultStatus(res)) + { + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + case PGRES_EMPTY_QUERY: + /* OK */ + PQclear(res); + discard_response(st); + st->state = CSTATE_END_COMMAND; + break; + default: + commandFailed(st, PQerrorMessage(st->con)); + PQclear(res); + st->state = CSTATE_ABORTED; + break; + } + break; + + /* + * Wait until sleep is done. This state is entered after a + * \sleep metacommand. The behavior is similar to + * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND + * instead of CSTATE_START_TX. + */ + case CSTATE_SLEEP: + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until) + return; /* Still sleeping, nothing to do here */ + /* Else done sleeping. */ + st->state = CSTATE_END_COMMAND; + break; + + /* + * End of command: record stats and proceed to next command. + */ + case CSTATE_END_COMMAND: + + /* + * command completed: accumulate per-command execution times + * in thread-local data structure, if per-command latencies + * are requested. + */ + if (is_latencies) + { + if (INSTR_TIME_IS_ZERO(now)) + INSTR_TIME_SET_CURRENT(now); + + /* XXX could use a mutex here, but we choose not to */ + command = sql_script[st->use_file].commands[st->command]; + addToSimpleStats(&command->stats, + INSTR_TIME_GET_DOUBLE(now) - + INSTR_TIME_GET_DOUBLE(st->stmt_begin)); + } + + /* Go ahead with next command */ + st->command++; + st->state = CSTATE_START_COMMAND; + break; + + /* + * End of transaction. + */ + case CSTATE_END_TX: + + /* + * transaction finished: calculate latency and log the + * transaction + */ + if (progress || throttle_delay || latency_limit || + per_script_stats || use_log) + processXactStats(thread, st, &now, false, agg); + else + thread->stats.cnt++; + + if (is_connect) + { + PQfinish(st->con); + st->con = NULL; + INSTR_TIME_SET_ZERO(now); + } + + ++st->cnt; + if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) + { + /* exit success */ + st->state = CSTATE_FINISHED; + break; + } + + /* + * No transaction is underway anymore. + */ + st->state = CSTATE_CHOOSE_SCRIPT; + + /* + * If we paced through all commands in the script in this + * loop, without returning to the caller even once, do it now. + * This gives the thread a chance to process other + * connections, and to do progress reporting. This can + * currently only happen if the script consists entirely of + * meta-commands. + */ + if (end_tx_processed) + return; + else + { + end_tx_processed = true; + break; + } + + /* + * Final states. Close the connection if it's still open. + */ + case CSTATE_ABORTED: + case CSTATE_FINISHED: + if (st->con != NULL) + { + PQfinish(st->con); + st->con = NULL; + } + return; } - else if (pg_strcasecmp(argv[0], "setshell") == 0) - { - bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); - - if (timer_exceeded) /* timeout */ - return clientDone(st); - else if (!ret) /* on error */ - { - st->ecnt++; - return true; - } - else /* succeeded */ - st->listen = true; - } - else if (pg_strcasecmp(argv[0], "shell") == 0) - { - bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); - - if (timer_exceeded) /* timeout */ - return clientDone(st); - else if (!ret) /* on error */ - { - st->ecnt++; - return true; - } - else /* succeeded */ - st->listen = true; - } - - /* after a meta command, immediately proceed with next command */ - goto top; } - - return true; } /* @@ -4183,29 +4420,10 @@ threadRun(void *arg) initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); last = aggs; - /* send start up queries in async manner */ + /* initialize explicitely the state machines */ for (i = 0; i < nstate; i++) { - CState *st = &state[i]; - int prev_ecnt = st->ecnt; - Command **commands; - - st->use_file = chooseScript(thread); - commands = sql_script[st->use_file].commands; - if (debug) - fprintf(stderr, "client %d executing script \"%s\"\n", st->id, - sql_script[st->use_file].desc); - if (!doCustom(thread, st, &aggs)) - remains--; /* I've aborted */ - - if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) - { - fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n", - i, st->state); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; - } + state[i].state = CSTATE_CHOOSE_SCRIPT; } while (remains > 0) @@ -4222,59 +4440,60 @@ threadRun(void *arg) for (i = 0; i < nstate; i++) { CState *st = &state[i]; - Command **commands = sql_script[st->use_file].commands; int sock; - if (st->con == NULL) + if (st->state == CSTATE_THROTTLE && timer_exceeded) { + /* interrupt client which has not started a transaction */ + st->state = CSTATE_FINISHED; + remains--; + PQfinish(st->con); + st->con = NULL; continue; } - else if (st->sleeping) + else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { - if (st->throttling && timer_exceeded) + /* a nap from the script, or under throttling */ + int this_usec; + + if (min_usec == PG_INT64_MAX) { - /* interrupt client which has not started a transaction */ - remains--; - st->sleeping = false; - st->throttling = false; - PQfinish(st->con); - st->con = NULL; - continue; + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); } - else /* just a nap from the script */ - { - int this_usec; - if (min_usec == PG_INT64_MAX) - { - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); - } - - this_usec = st->txn_scheduled - now_usec; - if (min_usec > this_usec) - min_usec = this_usec; - } + this_usec = (st->state == CSTATE_SLEEP ? + st->sleep_until : st->txn_scheduled) - now_usec; + if (min_usec > this_usec) + min_usec = this_usec; } - else if (commands[st->state]->type == META_COMMAND) + else if (st->state == CSTATE_WAIT_RESULT) { - min_usec = 0; /* the connection is ready to run */ + /* + * waiting for result from server - nothing to do unless the + * socket is readable + */ + sock = PQsocket(st->con); + if (sock < 0) + { + fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con)); + goto done; + } + + FD_SET(sock, &input_mask); + + if (maxsock < sock) + maxsock = sock; break; } - - sock = PQsocket(st->con); - if (sock < 0) + else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED) { - fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con)); - goto done; + /* the connection is ready to run */ + min_usec = 0; + break; } - - FD_SET(sock, &input_mask); - - if (maxsock < sock) - maxsock = sock; } /* also wake up to print the next progress report on time */ @@ -4324,14 +4543,13 @@ threadRun(void *arg) } } - /* ok, backend returns reply */ + /* ok, advance the state machine of each connection */ for (i = 0; i < nstate; i++) { CState *st = &state[i]; - Command **commands = sql_script[st->use_file].commands; - int prev_ecnt = st->ecnt; + bool ready; - if (st->con) + if (st->state == CSTATE_WAIT_RESULT && st->con) { int sock = PQsocket(st->con); @@ -4341,21 +4559,19 @@ threadRun(void *arg) PQerrorMessage(st->con)); goto done; } - if (FD_ISSET(sock, &input_mask) || - commands[st->state]->type == META_COMMAND) - { - if (!doCustom(thread, st, &aggs)) - remains--; /* I've aborted */ - } - } - if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) + ready = FD_ISSET(sock, &input_mask); + } + else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) + ready = false; + else + ready = true; + + if (ready) { - fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n", - i, st->state); - remains--; /* I've aborted */ - PQfinish(st->con); - st->con = NULL; + doCustom(thread, st, &aggs); + if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) + remains--; } }