diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1fb4ae46d5..d44cfdab49 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -299,7 +299,7 @@ typedef enum */ CSTATE_ABORTED, CSTATE_FINISHED -} ConnectionStateEnum; +} ConnectionStateEnum; /* * Connection state. @@ -4420,43 +4420,43 @@ threadRun(void *arg) initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); last = aggs; - /* initialize explicitely the state machines */ + /* explicitly initialize the state machines */ for (i = 0; i < nstate; i++) { state[i].state = CSTATE_CHOOSE_SCRIPT; } + /* loop till all clients have terminated */ while (remains > 0) { fd_set input_mask; - int maxsock; /* max socket number to be waited */ - int64 now_usec = 0; + int maxsock; /* max socket number to be waited for */ int64 min_usec; + int64 now_usec = 0; /* set this only if needed */ + /* identify which client sockets should be checked for input */ FD_ZERO(&input_mask); - maxsock = -1; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) { CState *st = &state[i]; - int sock; if (st->state == CSTATE_THROTTLE && timer_exceeded) { - /* interrupt client which has not started a transaction */ + /* interrupt client that has not started a transaction */ st->state = CSTATE_FINISHED; - remains--; PQfinish(st->con); st->con = NULL; - continue; + remains--; } else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { /* a nap from the script, or under throttling */ - int this_usec; + int64 this_usec; - if (min_usec == PG_INT64_MAX) + /* get current time if needed */ + if (now_usec == 0) { instr_time now; @@ -4464,6 +4464,7 @@ threadRun(void *arg) now_usec = INSTR_TIME_GET_MICROSEC(now); } + /* min_usec should be the minimum delay across all clients */ this_usec = (st->state == CSTATE_SLEEP ? st->sleep_until : st->txn_scheduled) - now_usec; if (min_usec > this_usec) @@ -4475,22 +4476,26 @@ threadRun(void *arg) * waiting for result from server - nothing to do unless the * socket is readable */ - sock = PQsocket(st->con); + int sock = PQsocket(st->con); + if (sock < 0) { - fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con)); + fprintf(stderr, "invalid socket: %s", + PQerrorMessage(st->con)); goto done; } FD_SET(sock, &input_mask); - if (maxsock < sock) maxsock = sock; - break; } - else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED) + else if (st->state != CSTATE_ABORTED && + st->state != CSTATE_FINISHED) { - /* the connection is ready to run */ + /* + * This client thread is ready to do something, so we don't + * want to wait. No need to examine additional clients. + */ min_usec = 0; break; } @@ -4515,9 +4520,10 @@ threadRun(void *arg) } /* - * Sleep until we receive data from the server, or a nap-time - * specified in the script ends, or it's time to print a progress - * report. + * If no clients are ready to execute actions, sleep until we receive + * data from the server, or a nap-time specified in the script ends, + * or it's time to print a progress report. Update input_mask to show + * which client(s) received data. */ if (min_usec > 0 && maxsock != -1) { @@ -4536,21 +4542,29 @@ threadRun(void *arg) if (nsocks < 0) { if (errno == EINTR) + { + /* On EINTR, go back to top of loop */ continue; + } /* must be something wrong */ fprintf(stderr, "select() failed: %s\n", strerror(errno)); goto done; } } + else + { + /* If we didn't call select(), don't try to read any data */ + FD_ZERO(&input_mask); + } /* ok, advance the state machine of each connection */ for (i = 0; i < nstate; i++) { CState *st = &state[i]; - bool ready; - if (st->state == CSTATE_WAIT_RESULT && st->con) + if (st->state == CSTATE_WAIT_RESULT) { + /* don't call doCustom unless data is available */ int sock = PQsocket(st->con); if (sock < 0) @@ -4560,22 +4574,24 @@ threadRun(void *arg) goto done; } - ready = FD_ISSET(sock, &input_mask); + if (!FD_ISSET(sock, &input_mask)) + continue; } - else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) - ready = false; - else - ready = true; - - if (ready) + else if (st->state == CSTATE_FINISHED || + st->state == CSTATE_ABORTED) { - doCustom(thread, st, &aggs); - if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) - remains--; + /* this client is done, no need to consider it anymore */ + continue; } + + doCustom(thread, st, &aggs); + + /* If doCustom changed client to finished state, reduce remains */ + if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED) + remains--; } - /* progress report by thread 0 for all threads */ + /* progress report is made by thread 0 for all threads */ if (progress && thread->tid == 0) { instr_time now_time;