postgresql/src/test/isolation/isolationtester.c

1143 lines
30 KiB
C

/*
* src/test/isolation/isolationtester.c
*
* isolationtester.c
* Runs an isolation test specified by a spec file.
*/
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/time.h>
#include "datatype/timestamp.h"
#include "isolationtester.h"
#include "libpq-fe.h"
#include "pg_getopt.h"
#include "pqexpbuffer.h"
#define PREP_WAITING "isolationtester_waiting"
/*
* conns[0] is the global setup, teardown, and watchdog connection. Additional
* connections represent spec-defined sessions.
*/
typedef struct IsoConnInfo
{
/* The libpq connection object for this connection. */
PGconn *conn;
/* The backend PID, in numeric and string formats. */
int backend_pid;
const char *backend_pid_str;
/* Name of the associated session. */
const char *sessionname;
/* Active step on this connection, or NULL if idle. */
PermutationStep *active_step;
/* Number of NOTICE messages received from connection. */
int total_notices;
} IsoConnInfo;
static IsoConnInfo *conns = NULL;
static int nconns = 0;
/* Flag indicating some new NOTICE has arrived */
static bool any_new_notice = false;
/* Maximum time to wait before giving up on a step (in usec) */
static int64 max_step_wait = 360 * USECS_PER_SEC;
static void check_testspec(TestSpec *testspec);
static void run_testspec(TestSpec *testspec);
static void run_all_permutations(TestSpec *testspec);
static void run_all_permutations_recurse(TestSpec *testspec, int *piles,
int nsteps, PermutationStep **steps);
static void run_named_permutations(TestSpec *testspec);
static void run_permutation(TestSpec *testspec, int nsteps,
PermutationStep **steps);
/* Flag bits for try_complete_step(s) */
#define STEP_NONBLOCK 0x1 /* return as soon as cmd waits for a lock */
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
static int try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
int nwaiting, int flags);
static bool try_complete_step(TestSpec *testspec, PermutationStep *pstep,
int flags);
static int step_qsort_cmp(const void *a, const void *b);
static int step_bsearch_cmp(const void *a, const void *b);
static bool step_has_blocker(PermutationStep *pstep);
static void printResultSet(PGresult *res);
static void isotesterNoticeProcessor(void *arg, const char *message);
static void blackholeNoticeProcessor(void *arg, const char *message);
static void
disconnect_atexit(void)
{
int i;
for (i = 0; i < nconns; i++)
if (conns[i].conn)
PQfinish(conns[i].conn);
}
int
main(int argc, char **argv)
{
const char *conninfo;
const char *env_wait;
TestSpec *testspec;
PGresult *res;
PQExpBufferData wait_query;
int opt;
int i;
while ((opt = getopt(argc, argv, "V")) != -1)
{
switch (opt)
{
case 'V':
puts("isolationtester (PostgreSQL) " PG_VERSION);
exit(0);
default:
fprintf(stderr, "Usage: isolationtester [CONNINFO]\n");
return EXIT_FAILURE;
}
}
/*
* Make stdout unbuffered to match stderr; and ensure stderr is unbuffered
* too, which it should already be everywhere except sometimes in Windows.
*/
setbuf(stdout, NULL);
setbuf(stderr, NULL);
/*
* If the user supplies a non-option parameter on the command line, use it
* as the conninfo string; otherwise default to setting dbname=postgres
* and using environment variables or defaults for all other connection
* parameters.
*/
if (argc > optind)
conninfo = argv[optind];
else
conninfo = "dbname = postgres";
/*
* If PG_TEST_TIMEOUT_DEFAULT is set, adopt its value (given in seconds)
* as half the max time to wait for any one step to complete.
*/
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
if (env_wait != NULL)
max_step_wait = 2 * ((int64) atoi(env_wait)) * USECS_PER_SEC;
/* Read the test spec from stdin */
spec_yyparse();
testspec = &parseresult;
/* Perform post-parse checking, and fill in linking fields */
check_testspec(testspec);
printf("Parsed test spec with %d sessions\n", testspec->nsessions);
/*
* Establish connections to the database, one for each session and an
* extra for lock wait detection and global work.
*/
nconns = 1 + testspec->nsessions;
conns = (IsoConnInfo *) pg_malloc0(nconns * sizeof(IsoConnInfo));
atexit(disconnect_atexit);
for (i = 0; i < nconns; i++)
{
const char *sessionname;
if (i == 0)
sessionname = "control connection";
else
sessionname = testspec->sessions[i - 1]->name;
conns[i].sessionname = sessionname;
conns[i].conn = PQconnectdb(conninfo);
if (PQstatus(conns[i].conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection %d failed: %s",
i, PQerrorMessage(conns[i].conn));
exit(1);
}
/*
* Set up notice processors for the user-defined connections, so that
* messages can get printed prefixed with the session names. The
* control connection gets a "blackhole" processor instead (hides all
* messages).
*/
if (i != 0)
PQsetNoticeProcessor(conns[i].conn,
isotesterNoticeProcessor,
(void *) &conns[i]);
else
PQsetNoticeProcessor(conns[i].conn,
blackholeNoticeProcessor,
NULL);
/*
* Similarly, append the session name to application_name to make it
* easier to map spec file sessions to log output and
* pg_stat_activity. The reason to append instead of just setting the
* name is that we don't know the name of the test currently running.
*/
res = PQexecParams(conns[i].conn,
"SELECT set_config('application_name',\n"
" current_setting('application_name') || '/' || $1,\n"
" false)",
1, NULL,
&sessionname,
NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "setting of application name failed: %s",
PQerrorMessage(conns[i].conn));
exit(1);
}
/* Save each connection's backend PID for subsequent use. */
conns[i].backend_pid = PQbackendPID(conns[i].conn);
conns[i].backend_pid_str = psprintf("%d", conns[i].backend_pid);
}
/*
* Build the query we'll use to detect lock contention among sessions in
* the test specification. Most of the time, we could get away with
* simply checking whether a session is waiting for *any* lock: we don't
* exactly expect concurrent use of test tables. However, autovacuum will
* occasionally take AccessExclusiveLock to truncate a table, and we must
* ignore that transient wait.
*/
initPQExpBuffer(&wait_query);
appendPQExpBufferStr(&wait_query,
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
/* The spec syntax requires at least one session; assume that here. */
appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
for (i = 2; i < nconns; i++)
appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
appendPQExpBufferStr(&wait_query, "}')");
res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "prepare of lock wait query failed: %s",
PQerrorMessage(conns[0].conn));
exit(1);
}
PQclear(res);
termPQExpBuffer(&wait_query);
/*
* Run the permutations specified in the spec, or all if none were
* explicitly specified.
*/
run_testspec(testspec);
return 0;
}
/*
* Validity-check the test spec and fill in cross-links between nodes.
*/
static void
check_testspec(TestSpec *testspec)
{
int nallsteps;
Step **allsteps;
int i,
j,
k;
/* Create a sorted lookup table of all steps. */
nallsteps = 0;
for (i = 0; i < testspec->nsessions; i++)
nallsteps += testspec->sessions[i]->nsteps;
allsteps = pg_malloc(nallsteps * sizeof(Step *));
k = 0;
for (i = 0; i < testspec->nsessions; i++)
{
for (j = 0; j < testspec->sessions[i]->nsteps; j++)
allsteps[k++] = testspec->sessions[i]->steps[j];
}
qsort(allsteps, nallsteps, sizeof(Step *), step_qsort_cmp);
/* Verify that all step names are unique. */
for (i = 1; i < nallsteps; i++)
{
if (strcmp(allsteps[i - 1]->name,
allsteps[i]->name) == 0)
{
fprintf(stderr, "duplicate step name: %s\n",
allsteps[i]->name);
exit(1);
}
}
/* Set the session index fields in steps. */
for (i = 0; i < testspec->nsessions; i++)
{
Session *session = testspec->sessions[i];
for (j = 0; j < session->nsteps; j++)
session->steps[j]->session = i;
}
/*
* If we have manually-specified permutations, link PermutationSteps to
* Steps, and fill in blocker links.
*/
for (i = 0; i < testspec->npermutations; i++)
{
Permutation *p = testspec->permutations[i];
for (j = 0; j < p->nsteps; j++)
{
PermutationStep *pstep = p->steps[j];
Step **this = (Step **) bsearch(pstep->name,
allsteps,
nallsteps,
sizeof(Step *),
step_bsearch_cmp);
if (this == NULL)
{
fprintf(stderr, "undefined step \"%s\" specified in permutation\n",
pstep->name);
exit(1);
}
pstep->step = *this;
/* Mark the step used, for check below */
pstep->step->used = true;
}
/*
* Identify any blocker steps. We search only the current
* permutation, since steps not used there couldn't be concurrent.
* Note that it's OK to reference later permutation steps, so this
* can't be combined with the previous loop.
*/
for (j = 0; j < p->nsteps; j++)
{
PermutationStep *pstep = p->steps[j];
for (k = 0; k < pstep->nblockers; k++)
{
PermutationStepBlocker *blocker = pstep->blockers[k];
int n;
if (blocker->blocktype == PSB_ONCE)
continue; /* nothing to link to */
blocker->step = NULL;
for (n = 0; n < p->nsteps; n++)
{
PermutationStep *otherp = p->steps[n];
if (strcmp(otherp->name, blocker->stepname) == 0)
{
blocker->step = otherp->step;
break;
}
}
if (blocker->step == NULL)
{
fprintf(stderr, "undefined blocking step \"%s\" referenced in permutation step \"%s\"\n",
blocker->stepname, pstep->name);
exit(1);
}
/* can't block on completion of step of own session */
if (blocker->step->session == pstep->step->session)
{
fprintf(stderr, "permutation step \"%s\" cannot block on its own session\n",
pstep->name);
exit(1);
}
}
}
}
/*
* If we have manually-specified permutations, verify that all steps have
* been used, warning about anything defined but not used. We can skip
* this when using automatically-generated permutations.
*/
if (testspec->permutations)
{
for (i = 0; i < nallsteps; i++)
{
if (!allsteps[i]->used)
fprintf(stderr, "unused step name: %s\n", allsteps[i]->name);
}
}
free(allsteps);
}
/*
* Run the permutations specified in the spec, or all if none were
* explicitly specified.
*/
static void
run_testspec(TestSpec *testspec)
{
if (testspec->permutations)
run_named_permutations(testspec);
else
run_all_permutations(testspec);
}
/*
* Run all permutations of the steps and sessions.
*/
static void
run_all_permutations(TestSpec *testspec)
{
int nsteps;
int i;
PermutationStep *steps;
PermutationStep **stepptrs;
int *piles;
/* Count the total number of steps in all sessions */
nsteps = 0;
for (i = 0; i < testspec->nsessions; i++)
nsteps += testspec->sessions[i]->nsteps;
/* Create PermutationStep workspace array */
steps = (PermutationStep *) pg_malloc0(sizeof(PermutationStep) * nsteps);
stepptrs = (PermutationStep **) pg_malloc(sizeof(PermutationStep *) * nsteps);
for (i = 0; i < nsteps; i++)
stepptrs[i] = steps + i;
/*
* To generate the permutations, we conceptually put the steps of each
* session on a pile. To generate a permutation, we pick steps from the
* piles until all piles are empty. By picking steps from piles in
* different order, we get different permutations.
*
* A pile is actually just an integer which tells how many steps we've
* already picked from this pile.
*/
piles = pg_malloc(sizeof(int) * testspec->nsessions);
for (i = 0; i < testspec->nsessions; i++)
piles[i] = 0;
run_all_permutations_recurse(testspec, piles, 0, stepptrs);
free(steps);
free(stepptrs);
free(piles);
}
static void
run_all_permutations_recurse(TestSpec *testspec, int *piles,
int nsteps, PermutationStep **steps)
{
int i;
bool found = false;
for (i = 0; i < testspec->nsessions; i++)
{
/* If there's any more steps in this pile, pick it and recurse */
if (piles[i] < testspec->sessions[i]->nsteps)
{
Step *newstep = testspec->sessions[i]->steps[piles[i]];
/*
* These automatically-generated PermutationSteps never have
* blocker conditions. So we need only fill these fields, relying
* on run_all_permutations() to have zeroed the rest:
*/
steps[nsteps]->name = newstep->name;
steps[nsteps]->step = newstep;
piles[i]++;
run_all_permutations_recurse(testspec, piles, nsteps + 1, steps);
piles[i]--;
found = true;
}
}
/* If all the piles were empty, this permutation is completed. Run it */
if (!found)
run_permutation(testspec, nsteps, steps);
}
/*
* Run permutations given in the test spec
*/
static void
run_named_permutations(TestSpec *testspec)
{
int i;
for (i = 0; i < testspec->npermutations; i++)
{
Permutation *p = testspec->permutations[i];
run_permutation(testspec, p->nsteps, p->steps);
}
}
static int
step_qsort_cmp(const void *a, const void *b)
{
Step *stepa = *((Step **) a);
Step *stepb = *((Step **) b);
return strcmp(stepa->name, stepb->name);
}
static int
step_bsearch_cmp(const void *a, const void *b)
{
char *stepname = (char *) a;
Step *step = *((Step **) b);
return strcmp(stepname, step->name);
}
/*
* Run one permutation
*/
static void
run_permutation(TestSpec *testspec, int nsteps, PermutationStep **steps)
{
PGresult *res;
int i;
int nwaiting = 0;
PermutationStep **waiting;
waiting = pg_malloc(sizeof(PermutationStep *) * testspec->nsessions);
printf("\nstarting permutation:");
for (i = 0; i < nsteps; i++)
printf(" %s", steps[i]->name);
printf("\n");
/* Perform setup */
for (i = 0; i < testspec->nsetupsqls; i++)
{
res = PQexec(conns[0].conn, testspec->setupsqls[i]);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
printResultSet(res);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0].conn));
exit(1);
}
PQclear(res);
}
/* Perform per-session setup */
for (i = 0; i < testspec->nsessions; i++)
{
if (testspec->sessions[i]->setupsql)
{
res = PQexec(conns[i + 1].conn, testspec->sessions[i]->setupsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
printResultSet(res);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "setup of session %s failed: %s",
conns[i + 1].sessionname,
PQerrorMessage(conns[i + 1].conn));
exit(1);
}
PQclear(res);
}
}
/* Perform steps */
for (i = 0; i < nsteps; i++)
{
PermutationStep *pstep = steps[i];
Step *step = pstep->step;
IsoConnInfo *iconn = &conns[1 + step->session];
PGconn *conn = iconn->conn;
bool mustwait;
int j;
/*
* Check whether the session that needs to perform the next step is
* still blocked on an earlier step. If so, wait for it to finish.
*/
if (iconn->active_step != NULL)
{
struct timeval start_time;
gettimeofday(&start_time, NULL);
while (iconn->active_step != NULL)
{
PermutationStep *oldstep = iconn->active_step;
/*
* Wait for oldstep. But even though we don't use
* STEP_NONBLOCK, it might not complete because of blocker
* conditions.
*/
if (!try_complete_step(testspec, oldstep, STEP_RETRY))
{
/* Done, so remove oldstep from the waiting[] array. */
int w;
for (w = 0; w < nwaiting; w++)
{
if (oldstep == waiting[w])
break;
}
if (w >= nwaiting)
abort(); /* can't happen */
if (w + 1 < nwaiting)
memmove(&waiting[w], &waiting[w + 1],
(nwaiting - (w + 1)) * sizeof(PermutationStep *));
nwaiting--;
}
/*
* Check for other steps that have finished. We should do
* this if oldstep completed, as it might have unblocked
* something. On the other hand, if oldstep hasn't completed,
* we must poll all the active steps in hopes of unblocking
* oldstep. So either way, poll them.
*/
nwaiting = try_complete_steps(testspec, waiting, nwaiting,
STEP_NONBLOCK | STEP_RETRY);
/*
* If the target session is still busy, apply a timeout to
* keep from hanging indefinitely, which could happen with
* incorrect blocker annotations. Use the same 2 *
* max_step_wait limit as try_complete_step does for deciding
* to die. (We don't bother with trying to cancel anything,
* since it's unclear what to cancel in this case.)
*/
if (iconn->active_step != NULL)
{
struct timeval current_time;
int64 td;
gettimeofday(&current_time, NULL);
td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
td *= USECS_PER_SEC;
td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
if (td > 2 * max_step_wait)
{
fprintf(stderr, "step %s timed out after %d seconds\n",
iconn->active_step->name,
(int) (td / USECS_PER_SEC));
fprintf(stderr, "active steps are:");
for (j = 1; j < nconns; j++)
{
IsoConnInfo *oconn = &conns[j];
if (oconn->active_step != NULL)
fprintf(stderr, " %s",
oconn->active_step->name);
}
fprintf(stderr, "\n");
exit(1);
}
}
}
}
/* Send the query for this step. */
if (!PQsendQuery(conn, step->sql))
{
fprintf(stdout, "failed to send query for step %s: %s\n",
step->name, PQerrorMessage(conn));
exit(1);
}
/* Remember we launched a step. */
iconn->active_step = pstep;
/* Remember target number of NOTICEs for any blocker conditions. */
for (j = 0; j < pstep->nblockers; j++)
{
PermutationStepBlocker *blocker = pstep->blockers[j];
if (blocker->blocktype == PSB_NUM_NOTICES)
blocker->target_notices = blocker->num_notices +
conns[blocker->step->session + 1].total_notices;
}
/* Try to complete this step without blocking. */
mustwait = try_complete_step(testspec, pstep, STEP_NONBLOCK);
/* Check for completion of any steps that were previously waiting. */
nwaiting = try_complete_steps(testspec, waiting, nwaiting,
STEP_NONBLOCK | STEP_RETRY);
/* If this step is waiting, add it to the array of waiters. */
if (mustwait)
waiting[nwaiting++] = pstep;
}
/* Wait for any remaining queries. */
nwaiting = try_complete_steps(testspec, waiting, nwaiting, STEP_RETRY);
if (nwaiting != 0)
{
fprintf(stderr, "failed to complete permutation due to mutually-blocking steps\n");
exit(1);
}
/* Perform per-session teardown */
for (i = 0; i < testspec->nsessions; i++)
{
if (testspec->sessions[i]->teardownsql)
{
res = PQexec(conns[i + 1].conn, testspec->sessions[i]->teardownsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
printResultSet(res);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "teardown of session %s failed: %s",
conns[i + 1].sessionname,
PQerrorMessage(conns[i + 1].conn));
/* don't exit on teardown failure */
}
PQclear(res);
}
}
/* Perform teardown */
if (testspec->teardownsql)
{
res = PQexec(conns[0].conn, testspec->teardownsql);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
printResultSet(res);
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "teardown failed: %s",
PQerrorMessage(conns[0].conn));
/* don't exit on teardown failure */
}
PQclear(res);
}
free(waiting);
}
/*
* Check for completion of any waiting step(s).
* Remove completed ones from the waiting[] array,
* and return the new value of nwaiting.
* See try_complete_step for the meaning of the flags.
*/
static int
try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
int nwaiting, int flags)
{
int old_nwaiting;
bool have_blocker;
do
{
int w = 0;
/* Reset latch; we only care about notices received within loop. */
any_new_notice = false;
/* Likewise, these variables reset for each retry. */
old_nwaiting = nwaiting;
have_blocker = false;
/* Scan the array, try to complete steps. */
while (w < nwaiting)
{
if (try_complete_step(testspec, waiting[w], flags))
{
/* Still blocked, leave it alone. */
if (waiting[w]->nblockers > 0)
have_blocker = true;
w++;
}
else
{
/* Done, remove it from array. */
if (w + 1 < nwaiting)
memmove(&waiting[w], &waiting[w + 1],
(nwaiting - (w + 1)) * sizeof(PermutationStep *));
nwaiting--;
}
}
/*
* If any of the still-waiting steps have blocker conditions attached,
* it's possible that one of the steps we examined afterwards has
* released them (either by completing, or by sending a NOTICE). If
* any step completions or NOTICEs happened, repeat the loop until
* none occurs. Without this provision, completion timing could vary
* depending on the order in which the steps appear in the array.
*/
} while (have_blocker && (nwaiting < old_nwaiting || any_new_notice));
return nwaiting;
}
/*
* Our caller already sent the query associated with this step. Wait for it
* to either complete, or hit a blocking condition.
*
* When calling this function on behalf of a given step for a second or later
* time, pass the STEP_RETRY flag. Do not pass it on the first call.
*
* Returns true if the step was *not* completed, false if it was completed.
* Reasons for non-completion are (a) the STEP_NONBLOCK flag was specified
* and the query is waiting to acquire a lock, or (b) the step has an
* unsatisfied blocker condition. When STEP_NONBLOCK is given, we assume
* that any lock wait will persist until we have executed additional steps.
*/
static bool
try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
{
Step *step = pstep->step;
IsoConnInfo *iconn = &conns[1 + step->session];
PGconn *conn = iconn->conn;
fd_set read_set;
struct timeval start_time;
struct timeval timeout;
int sock = PQsocket(conn);
int ret;
PGresult *res;
PGnotify *notify;
bool canceled = false;
/*
* If the step is annotated with (*), then on the first call, force it to
* wait. This is useful for ensuring consistent output when the step
* might or might not complete so fast that we don't observe it waiting.
*/
if (!(flags & STEP_RETRY))
{
int i;
for (i = 0; i < pstep->nblockers; i++)
{
PermutationStepBlocker *blocker = pstep->blockers[i];
if (blocker->blocktype == PSB_ONCE)
{
printf("step %s: %s <waiting ...>\n",
step->name, step->sql);
return true;
}
}
}
if (sock < 0)
{
fprintf(stderr, "invalid socket: %s", PQerrorMessage(conn));
exit(1);
}
gettimeofday(&start_time, NULL);
FD_ZERO(&read_set);
while (PQisBusy(conn))
{
FD_SET(sock, &read_set);
timeout.tv_sec = 0;
timeout.tv_usec = 10000; /* Check for lock waits every 10ms. */
ret = select(sock + 1, &read_set, NULL, NULL, &timeout);
if (ret < 0) /* error in select() */
{
if (errno == EINTR)
continue;
fprintf(stderr, "select failed: %m\n");
exit(1);
}
else if (ret == 0) /* select() timeout: check for lock wait */
{
struct timeval current_time;
int64 td;
/* If it's OK for the step to block, check whether it has. */
if (flags & STEP_NONBLOCK)
{
bool waiting;
res = PQexecPrepared(conns[0].conn, PREP_WAITING, 1,
&conns[step->session + 1].backend_pid_str,
NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK ||
PQntuples(res) != 1)
{
fprintf(stderr, "lock wait query failed: %s",
PQerrorMessage(conns[0].conn));
exit(1);
}
waiting = ((PQgetvalue(res, 0, 0))[0] == 't');
PQclear(res);
if (waiting) /* waiting to acquire a lock */
{
/*
* Since it takes time to perform the lock-check query,
* some data --- notably, NOTICE messages --- might have
* arrived since we looked. We must call PQconsumeInput
* and then PQisBusy to collect and process any such
* messages. In the (unlikely) case that PQisBusy then
* returns false, we might as well go examine the
* available result.
*/
if (!PQconsumeInput(conn))
{
fprintf(stderr, "PQconsumeInput failed: %s\n",
PQerrorMessage(conn));
exit(1);
}
if (!PQisBusy(conn))
break;
/*
* conn is still busy, so conclude that the step really is
* waiting.
*/
if (!(flags & STEP_RETRY))
printf("step %s: %s <waiting ...>\n",
step->name, step->sql);
return true;
}
/* else, not waiting */
}
/* Figure out how long we've been waiting for this step. */
gettimeofday(&current_time, NULL);
td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
td *= USECS_PER_SEC;
td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
/*
* After max_step_wait microseconds, try to cancel the query.
*
* If the user tries to test an invalid permutation, we don't want
* to hang forever, especially when this is running in the
* buildfarm. This will presumably lead to this permutation
* failing, but remaining permutations and tests should still be
* OK.
*/
if (td > max_step_wait && !canceled)
{
PGcancelConn *cancel_conn = PQcancelCreate(conn);
if (PQcancelBlocking(cancel_conn))
{
/*
* print to stdout not stderr, as this should appear in
* the test case's results
*/
printf("isolationtester: canceling step %s after %d seconds\n",
step->name, (int) (td / USECS_PER_SEC));
canceled = true;
}
else
fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
PQcancelFinish(cancel_conn);
}
/*
* After twice max_step_wait, just give up and die.
*
* Since cleanup steps won't be run in this case, this may cause
* later tests to fail. That stinks, but it's better than waiting
* forever for the server to respond to the cancel.
*/
if (td > 2 * max_step_wait)
{
fprintf(stderr, "step %s timed out after %d seconds\n",
step->name, (int) (td / USECS_PER_SEC));
exit(1);
}
}
else if (!PQconsumeInput(conn)) /* select(): data available */
{
fprintf(stderr, "PQconsumeInput failed: %s\n",
PQerrorMessage(conn));
exit(1);
}
}
/*
* The step is done, but we won't report it as complete so long as there
* are blockers.
*/
if (step_has_blocker(pstep))
{
if (!(flags & STEP_RETRY))
printf("step %s: %s <waiting ...>\n",
step->name, step->sql);
return true;
}
/* Otherwise, go ahead and complete it. */
if (flags & STEP_RETRY)
printf("step %s: <... completed>\n", step->name);
else
printf("step %s: %s\n", step->name, step->sql);
while ((res = PQgetResult(conn)))
{
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
case PGRES_EMPTY_QUERY:
break;
case PGRES_TUPLES_OK:
printResultSet(res);
break;
case PGRES_FATAL_ERROR:
/*
* Detail may contain XID values, so we want to just show
* primary. Beware however that libpq-generated error results
* may not contain subfields, only an old-style message.
*/
{
const char *sev = PQresultErrorField(res,
PG_DIAG_SEVERITY);
const char *msg = PQresultErrorField(res,
PG_DIAG_MESSAGE_PRIMARY);
if (sev && msg)
printf("%s: %s\n", sev, msg);
else
printf("%s\n", PQresultErrorMessage(res));
}
break;
default:
printf("unexpected result status: %s\n",
PQresStatus(PQresultStatus(res)));
}
PQclear(res);
}
/* Report any available NOTIFY messages, too */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
/* Try to identify which session it came from */
const char *sendername = NULL;
char pidstring[32];
int i;
for (i = 0; i < testspec->nsessions; i++)
{
if (notify->be_pid == conns[i + 1].backend_pid)
{
sendername = conns[i + 1].sessionname;
break;
}
}
if (sendername == NULL)
{
/* Doesn't seem to be any test session, so show the hard way */
snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
sendername = pidstring;
}
printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
testspec->sessions[step->session]->name,
notify->relname, notify->extra, sendername);
PQfreemem(notify);
PQconsumeInput(conn);
}
/* Connection is now idle. */
iconn->active_step = NULL;
return false;
}
/* Detect whether a step has any unsatisfied blocker conditions */
static bool
step_has_blocker(PermutationStep *pstep)
{
int i;
for (i = 0; i < pstep->nblockers; i++)
{
PermutationStepBlocker *blocker = pstep->blockers[i];
IsoConnInfo *iconn;
switch (blocker->blocktype)
{
case PSB_ONCE:
/* Ignore; try_complete_step handles this specially */
break;
case PSB_OTHER_STEP:
/* Block if referenced step is active */
iconn = &conns[1 + blocker->step->session];
if (iconn->active_step &&
iconn->active_step->step == blocker->step)
return true;
break;
case PSB_NUM_NOTICES:
/* Block if not enough notices received yet */
iconn = &conns[1 + blocker->step->session];
if (iconn->total_notices < blocker->target_notices)
return true;
break;
}
}
return false;
}
static void
printResultSet(PGresult *res)
{
PQprintOpt popt;
memset(&popt, 0, sizeof(popt));
popt.header = true;
popt.align = true;
popt.fieldSep = "|";
PQprint(stdout, res, &popt);
}
/* notice processor for regular user sessions */
static void
isotesterNoticeProcessor(void *arg, const char *message)
{
IsoConnInfo *myconn = (IsoConnInfo *) arg;
/* Prefix the backend's message with the session name. */
printf("%s: %s", myconn->sessionname, message);
/* Record notices, since we may need this to decide to unblock a step. */
myconn->total_notices++;
any_new_notice = true;
}
/* notice processor, hides the message */
static void
blackholeNoticeProcessor(void *arg, const char *message)
{
/* do nothing */
}