From a7192326c74da417d024a189da4d33c1bf1b40b6 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Tue, 29 Jun 2021 14:37:39 -0400 Subject: [PATCH] Add PQsendFlushRequest to libpq MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This new libpq function allows the application to send an 'H' message, which instructs the server to flush its outgoing buffer. This hasn't been needed so far because the Sync message already requests a buffer; and I failed to realize that this was needed in pipeline mode because PQpipelineSync also causes the buffer to be flushed. However, sometimes it is useful to request a flush without establishing a synchronization point. Backpatch to 14, where pipeline mode was introduced in libpq. Reported-by: Boris Kolpackov Author: Álvaro Herrera Discussion: https://postgr.es/m/202106252350.t76x73nt643j@alvherre.pgsql --- doc/src/sgml/libpq.sgml | 33 +++++++++++++++++++++++++++-- src/interfaces/libpq/exports.txt | 3 ++- src/interfaces/libpq/fe-exec.c | 36 ++++++++++++++++++++++++++++++++ src/interfaces/libpq/libpq-fe.h | 1 + 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 641970f2a6..59e3e678f9 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -5102,10 +5102,13 @@ int PQflush(PGconn *conn); The server executes statements, and returns results, in the order the client sends them. The server will begin executing the commands in the pipeline immediately, not waiting for the end of the pipeline. + Note that results are buffered on the server side; the server flushes + that buffer when a synchronization point is established with + PQpipelineSync, or when + PQsendFlushRequest is called. If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue - until the next synchronization point established by - PQpipelineSync; + until the next synchronization point; a PGRES_PIPELINE_ABORTED result is produced for each such command. (This remains true even if the commands in the pipeline would rollback @@ -5399,6 +5402,32 @@ int PQpipelineSync(PGconn *conn); + + + PQsendFlushRequestPQsendFlushRequest + + + + Sends a request for the server to flush its output buffer. + +int PQsendFlushRequest(PGconn *conn); + + + + + Returns 1 for success. Returns 0 on any failure. + + + The server flushes its output buffer automatically as a result of + PQpipelineSync being called, or + on any request when not in pipeline mode; this function is useful + to cause the server to flush its output buffer in pipeline mode + without establishing a synchronization point. + Note that the request is not itself flushed to the server automatically; + use PQflush if necessary. + + + diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 824a03ffbd..e8bcc88370 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -184,4 +184,5 @@ PQexitPipelineMode 181 PQpipelineSync 182 PQpipelineStatus 183 PQsetTraceFlags 184 -PQmblenBounded 185 +PQmblenBounded 185 +PQsendFlushRequest 186 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 7bd5b3a7b9..c1b1269672 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -3099,6 +3099,42 @@ sendFailed: return 0; } +/* + * PQsendFlushRequest + * Send request for server to flush its buffer. Useful in pipeline + * mode when a sync point is not desired. + */ +int +PQsendFlushRequest(PGconn *conn) +{ + if (!conn) + return 0; + + /* Don't try to send if we know there's no live connection. */ + if (conn->status != CONNECTION_OK) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("no connection to the server\n")); + return 0; + } + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && + conn->pipelineStatus == PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("another command is already in progress\n")); + return false; + } + + if (pqPutMsgStart('H', conn) < 0 || + pqPutMsgEnd(conn) < 0) + { + return 0; + } + + return 1; +} /* ====== accessor funcs for PGresult ======== */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index cc6032b15b..a6fd69aceb 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -470,6 +470,7 @@ extern int PQconsumeInput(PGconn *conn); extern int PQenterPipelineMode(PGconn *conn); extern int PQexitPipelineMode(PGconn *conn); extern int PQpipelineSync(PGconn *conn); +extern int PQsendFlushRequest(PGconn *conn); /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn);