diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 08326895ee..8895f81add 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2078,10 +2078,9 @@ PQgetResult(PGconn *conn) /* * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query, if any, when we're called next. If there's - * no next element in the command queue, this gets us in IDLE - * state. + * results from the current query; prepare to send the results of + * the next query, if any, when we're called next. If there's no + * next element in the command queue, this gets us in IDLE state. */ resetPQExpBuffer(&conn->errorMessage); pqPipelineProcessQueue(conn); @@ -2089,19 +2088,12 @@ PQgetResult(PGconn *conn) break; case PGASYNC_READY: - - /* - * For any query type other than simple query protocol, we advance - * the command queue here. This is because for simple query - * protocol we can get the READY state multiple times before the - * command is actually complete, since the command string can - * contain many queries. In simple query protocol, the queue - * advance is done by fe-protocol3 when it receives ReadyForQuery. - */ - if (conn->cmd_queue_head && - conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) - pqCommandQueueAdvance(conn); res = pqPrepareAsyncResult(conn); + + /* Advance the queue as appropriate */ + pqCommandQueueAdvance(conn, false, + res->resultStatus == PGRES_PIPELINE_SYNC); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) { /* @@ -2165,7 +2157,7 @@ PQgetResult(PGconn *conn) } else /* we won't ever see the Close */ - pqCommandQueueAdvance(conn); + pqCommandQueueAdvance(conn, false, false); } if (res) @@ -3027,18 +3019,44 @@ PQexitPipelineMode(PGconn *conn) /* * pqCommandQueueAdvance - * Remove one query from the command queue, when we receive - * all results from the server that pertain to it. + * Remove one query from the command queue, if appropriate. + * + * If we have received all results corresponding to the head element + * in the command queue, remove it. + * + * In simple query protocol we must not advance the command queue until the + * ReadyForQuery message has been received. This is because in simple mode a + * command can have multiple queries, and we must process result for all of + * them before moving on to the next command. + * + * Another consideration is synchronization during error processing in + * extended query protocol: we refuse to advance the queue past a SYNC queue + * element, unless the result we've received is also a SYNC. In particular + * this protects us from advancing when an error is received at an + * inappropriate moment. */ void -pqCommandQueueAdvance(PGconn *conn) +pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync) { PGcmdQueueEntry *prevquery; if (conn->cmd_queue_head == NULL) return; - /* delink from queue */ + /* + * If processing a query of simple query protocol, we only advance the + * queue when we receive the ReadyForQuery message for it. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery) + return; + + /* + * If we're waiting for a SYNC, don't advance the queue until we get one. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync) + return; + + /* delink element from queue */ prevquery = conn->cmd_queue_head; conn->cmd_queue_head = conn->cmd_queue_head->next; @@ -3046,7 +3064,7 @@ pqCommandQueueAdvance(PGconn *conn) if (conn->cmd_queue_head == NULL) conn->cmd_queue_tail = NULL; - /* and make it recyclable */ + /* and make the queue element recyclable */ prevquery->next = NULL; pqRecycleCmdQueueEntry(conn, prevquery); } @@ -3070,6 +3088,7 @@ pqPipelineProcessQueue(PGconn *conn) return; case PGASYNC_IDLE: + /* * If we're in IDLE mode and there's some command in the queue, * get us into PIPELINE_IDLE mode and process normally. Otherwise diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index c33f904db4..0c1903336a 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -240,13 +240,8 @@ pqParseInput3(PGconn *conn) } else { - /* - * In simple query protocol, advance the command queue - * (see PQgetResult). - */ - if (conn->cmd_queue_head && - conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE) - pqCommandQueueAdvance(conn); + /* Advance the command queue and set us idle */ + pqCommandQueueAdvance(conn, true, false); conn->asyncStatus = PGASYNC_IDLE; } break; @@ -287,11 +282,12 @@ pqParseInput3(PGconn *conn) /* Nothing to do for this message type */ break; case '3': /* Close Complete */ + /* * If we get CloseComplete when waiting for it, consume * the queue element and keep going. A result is not - * expected from this message; it is just there so that - * we know to wait for it when PQsendQuery is used in + * expected from this message; it is just there so that we + * know to wait for it when PQsendQuery is used in * pipeline mode, before going in IDLE state. Failing to * do this makes us receive CloseComplete when IDLE, which * creates problems. @@ -299,7 +295,7 @@ pqParseInput3(PGconn *conn) if (conn->cmd_queue_head && conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) { - pqCommandQueueAdvance(conn); + pqCommandQueueAdvance(conn, false, false); } break; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index fa3898e2b5..d052b8255c 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -656,7 +656,8 @@ extern void pqSaveMessageField(PGresult *res, char code, extern void pqSaveParameterStatus(PGconn *conn, const char *name, const char *value); extern int pqRowProcessor(PGconn *conn, const char **errmsgp); -extern void pqCommandQueueAdvance(PGconn *conn); +extern void pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, + bool gotSync); extern int PQsendQueryContinue(PGconn *conn, const char *query); /* === in fe-protocol3.c === */