diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c1b1269672..b13ddab393 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - if (conn->pipelineStatus == PQ_PIPELINE_OFF) - conn->asyncStatus = PGASYNC_BUSY; + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: @@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn, pqAppendCmdQueueEntry(conn, entry); - if (conn->pipelineStatus == PQ_PIPELINE_OFF) - conn->asyncStatus = PGASYNC_BUSY; + conn->asyncStatus = PGASYNC_BUSY; /* * Give the data a push (in pipeline mode, only if we're past the size @@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn, /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - if (conn->pipelineStatus == PQ_PIPELINE_OFF) - conn->asyncStatus = PGASYNC_BUSY; + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: @@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - if (conn->pipelineStatus == PQ_PIPELINE_OFF) - conn->asyncStatus = PGASYNC_BUSY; + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: @@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn) */ if (PQflush(conn) < 0) goto sendFailed; - - /* - * Call pqPipelineProcessQueue so the user can call start calling - * PQgetResult. - */ - pqPipelineProcessQueue(conn); + conn->asyncStatus = PGASYNC_BUSY; return 1; diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 71eedb6dbb..249ee22105 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -230,6 +230,93 @@ test_multi_pipelines(PGconn *conn) fprintf(stderr, "ok\n"); } +/* + * Test behavior when a pipeline dispatches a number of commands that are + * not flushed by a sync point. + */ +static void +test_nosync(PGconn *conn) +{ + int numqueries = 10; + int results = 0; + int sock = PQsocket(conn); + + fprintf(stderr, "nosync... "); + + if (sock < 0) + pg_fatal("invalid socket"); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("could not enter pipeline mode"); + for (int i = 0; i < numqueries; i++) + { + fd_set input_mask; + struct timeval tv; + + if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("error sending select: %s", PQerrorMessage(conn)); + PQflush(conn); + + /* + * If the server has written anything to us, read (some of) it now. + */ + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + tv.tv_sec = 0; + tv.tv_usec = 0; + if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1) + pg_fatal("failed to read from server: %s", PQerrorMessage(conn)); + } + + /* tell server to flush its output buffer */ + if (PQsendFlushRequest(conn) != 1) + pg_fatal("failed to send flush request"); + PQflush(conn); + + /* Now read all results */ + for (;;) + { + PGresult *res; + + res = PQgetResult(conn); + + /* NULL results are only expected after TUPLES_OK */ + if (res == NULL) + pg_fatal("got unexpected NULL result after %d results", results); + + /* We expect exactly one TUPLES_OK result for each query we sent */ + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + PGresult *res2; + + /* and one NULL result should follow each */ + res2 = PQgetResult(conn); + if (res2 != NULL) + pg_fatal("expected NULL, got %s", + PQresStatus(PQresultStatus(res2))); + PQclear(res); + results++; + + /* if we're done, we're done */ + if (results == numqueries) + break; + + continue; + } + + /* anything else is unexpected */ + pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res))); + } + + fprintf(stderr, "ok\n"); +} + /* * When an operation in a pipeline fails the rest of the pipeline is flushed. We * still have to get results for each pipeline item, but the item will just be @@ -1237,6 +1324,7 @@ print_test_list(void) { printf("disallowed_in_pipeline\n"); printf("multi_pipelines\n"); + printf("nosync\n"); printf("pipeline_abort\n"); printf("pipelined_insert\n"); printf("prepared\n"); @@ -1334,6 +1422,8 @@ main(int argc, char **argv) test_disallowed_in_pipeline(conn); else if (strcmp(testname, "multi_pipelines") == 0) test_multi_pipelines(conn); + else if (strcmp(testname, "nosync") == 0) + test_nosync(conn); else if (strcmp(testname, "pipeline_abort") == 0) test_pipeline_abort(conn); else if (strcmp(testname, "pipelined_insert") == 0) diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl index 2bc0e6c223..4101ef950e 100644 --- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl +++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl @@ -26,7 +26,7 @@ for my $testname (@tests) { my @extraargs = ('-r', $numrows); my $cmptrace = grep(/^$testname$/, - qw(simple_pipeline multi_pipelines prepared singlerow + qw(simple_pipeline nosync multi_pipelines prepared singlerow pipeline_abort transaction disallowed_in_pipeline)) > 0; # For a bunch of tests, generate a libpq trace file too. diff --git a/src/test/modules/libpq_pipeline/traces/nosync.trace b/src/test/modules/libpq_pipeline/traces/nosync.trace new file mode 100644 index 0000000000..d99aac649d --- /dev/null +++ b/src/test/modules/libpq_pipeline/traces/nosync.trace @@ -0,0 +1,92 @@ +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +B 4 ParseComplete +B 4 BindComplete +B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0 +B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz' +B 13 CommandComplete "SELECT 1" +F 4 Terminate