diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 04610ccf5e..b9511df2c2 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2112,19 +2112,12 @@ PQgetResult(PGconn *conn) break; case PGASYNC_READY: - - /* - * For any query type other than simple query protocol, we advance - * the command queue here. This is because for simple query - * protocol we can get the READY state multiple times before the - * command is actually complete, since the command string can - * contain many queries. In simple query protocol, the queue - * advance is done by fe-protocol3 when it receives ReadyForQuery. - */ - if (conn->cmd_queue_head && - conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) - pqCommandQueueAdvance(conn); res = pqPrepareAsyncResult(conn); + + /* Advance the queue as appropriate */ + pqCommandQueueAdvance(conn, false, + res->resultStatus == PGRES_PIPELINE_SYNC); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) { /* @@ -3088,18 +3081,44 @@ PQexitPipelineMode(PGconn *conn) /* * pqCommandQueueAdvance - * Remove one query from the command queue, when we receive - * all results from the server that pertain to it. + * Remove one query from the command queue, if appropriate. + * + * If we have received all results corresponding to the head element + * in the command queue, remove it. + * + * In simple query protocol we must not advance the command queue until the + * ReadyForQuery message has been received. This is because in simple mode a + * command can have multiple queries, and we must process result for all of + * them before moving on to the next command. + * + * Another consideration is synchronization during error processing in + * extended query protocol: we refuse to advance the queue past a SYNC queue + * element, unless the result we've received is also a SYNC. In particular + * this protects us from advancing when an error is received at an + * inappropriate moment. */ void -pqCommandQueueAdvance(PGconn *conn) +pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync) { PGcmdQueueEntry *prevquery; if (conn->cmd_queue_head == NULL) return; - /* delink from queue */ + /* + * If processing a query of simple query protocol, we only advance the + * queue when we receive the ReadyForQuery message for it. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery) + return; + + /* + * If we're waiting for a SYNC, don't advance the queue until we get one. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync) + return; + + /* delink element from queue */ prevquery = conn->cmd_queue_head; conn->cmd_queue_head = conn->cmd_queue_head->next; @@ -3107,7 +3126,7 @@ pqCommandQueueAdvance(PGconn *conn) if (conn->cmd_queue_head == NULL) conn->cmd_queue_tail = NULL; - /* and make it recyclable */ + /* and make the queue element recyclable */ prevquery->next = NULL; pqRecycleCmdQueueEntry(conn, prevquery); } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 5613c56b14..8c4ec079ca 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -240,13 +240,8 @@ pqParseInput3(PGconn *conn) } else { - /* - * In simple query protocol, advance the command queue - * (see PQgetResult). - */ - if (conn->cmd_queue_head && - conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE) - pqCommandQueueAdvance(conn); + /* Advance the command queue and set us idle */ + pqCommandQueueAdvance(conn, true, false); conn->asyncStatus = PGASYNC_IDLE; } break; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 22bc682ffc..7888199b0d 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -698,7 +698,8 @@ extern void pqSaveMessageField(PGresult *res, char code, extern void pqSaveParameterStatus(PGconn *conn, const char *name, const char *value); extern int pqRowProcessor(PGconn *conn, const char **errmsgp); -extern void pqCommandQueueAdvance(PGconn *conn); +extern void pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, + bool gotSync); extern int PQsendQueryContinue(PGconn *conn, const char *query); /* === in fe-protocol3.c === */