Add PQsendFlushRequest to libpq

This new libpq function allows the application to send an 'H' message,
which instructs the server to flush its outgoing buffer.

This hasn't been needed so far because the Sync message already requests
a buffer; and I failed to realize that this was needed in pipeline mode
because PQpipelineSync also causes the buffer to be flushed.  However,
sometimes it is useful to request a flush without establishing a
synchronization point.

Backpatch to 14, where pipeline mode was introduced in libpq.

Reported-by: Boris Kolpackov <boris@codesynthesis.com>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/202106252350.t76x73nt643j@alvherre.pgsql
This commit is contained in:
Alvaro Herrera 2021-06-29 14:37:39 -04:00
parent dd2364ced9
commit a7192326c7
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
4 changed files with 70 additions and 3 deletions

View File

@ -5102,10 +5102,13 @@ int PQflush(PGconn *conn);
The server executes statements, and returns results, in the order the The server executes statements, and returns results, in the order the
client sends them. The server will begin executing the commands in the client sends them. The server will begin executing the commands in the
pipeline immediately, not waiting for the end of the pipeline. pipeline immediately, not waiting for the end of the pipeline.
Note that results are buffered on the server side; the server flushes
that buffer when a synchronization point is established with
<function>PQpipelineSync</function>, or when
<function>PQsendFlushRequest</function> is called.
If any statement encounters an error, the server aborts the current If any statement encounters an error, the server aborts the current
transaction and does not execute any subsequent command in the queue transaction and does not execute any subsequent command in the queue
until the next synchronization point established by until the next synchronization point;
<function>PQpipelineSync</function>;
a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
each such command. each such command.
(This remains true even if the commands in the pipeline would rollback (This remains true even if the commands in the pipeline would rollback
@ -5399,6 +5402,32 @@ int PQpipelineSync(PGconn *conn);
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="libpq-PQsendFlushRequest">
<term><function>PQsendFlushRequest</function><indexterm><primary>PQsendFlushRequest</primary></indexterm></term>
<listitem>
<para>
Sends a request for the server to flush its output buffer.
<synopsis>
int PQsendFlushRequest(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success. Returns 0 on any failure.
</para>
<para>
The server flushes its output buffer automatically as a result of
<function>PQpipelineSync</function> being called, or
on any request when not in pipeline mode; this function is useful
to cause the server to flush its output buffer in pipeline mode
without establishing a synchronization point.
Note that the request is not itself flushed to the server automatically;
use <function>PQflush</function> if necessary.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</sect2> </sect2>

View File

@ -184,4 +184,5 @@ PQexitPipelineMode 181
PQpipelineSync 182 PQpipelineSync 182
PQpipelineStatus 183 PQpipelineStatus 183
PQsetTraceFlags 184 PQsetTraceFlags 184
PQmblenBounded 185 PQmblenBounded 185
PQsendFlushRequest 186

View File

@ -3099,6 +3099,42 @@ sendFailed:
return 0; return 0;
} }
/*
* PQsendFlushRequest
* Send request for server to flush its buffer. Useful in pipeline
* mode when a sync point is not desired.
*/
int
PQsendFlushRequest(PGconn *conn)
{
if (!conn)
return 0;
/* Don't try to send if we know there's no live connection. */
if (conn->status != CONNECTION_OK)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("no connection to the server\n"));
return 0;
}
/* Can't send while already busy, either, unless enqueuing for later */
if (conn->asyncStatus != PGASYNC_IDLE &&
conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
if (pqPutMsgStart('H', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
{
return 0;
}
return 1;
}
/* ====== accessor funcs for PGresult ======== */ /* ====== accessor funcs for PGresult ======== */

View File

@ -470,6 +470,7 @@ extern int PQconsumeInput(PGconn *conn);
extern int PQenterPipelineMode(PGconn *conn); extern int PQenterPipelineMode(PGconn *conn);
extern int PQexitPipelineMode(PGconn *conn); extern int PQexitPipelineMode(PGconn *conn);
extern int PQpipelineSync(PGconn *conn); extern int PQpipelineSync(PGconn *conn);
extern int PQsendFlushRequest(PGconn *conn);
/* LISTEN/NOTIFY support */ /* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn); extern PGnotify *PQnotifies(PGconn *conn);