Implement pipeline mode in libpq

Pipeline mode in libpq lets an application avoid the Sync messages in
the FE/BE protocol that are implicit in the old libpq API after each
query.  The application can then insert Sync at its leisure with a new
libpq function PQpipelineSync.  This can lead to substantial reductions
in query latency.

Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com>
Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com>
Reviewed-by: Daniel Vérité <daniel@manitou-mail.org>
Reviewed-by: David G. Johnston <david.g.johnston@gmail.com>
Reviewed-by: Justin Pryzby <pryzby@telsasoft.com>
Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com>
Reviewed-by: Michael Paquier <michael.paquier@gmail.com>
Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com>
Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>

Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com
Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2021-03-15 18:13:42 -03:00
parent 146cb3889c
commit acb7e4eb6b
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
18 changed files with 2706 additions and 113 deletions

View File

@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res);
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-pipeline-sync">
<term><literal>PGRES_PIPELINE_SYNC</literal></term>
<listitem>
<para>
The <structname>PGresult</structname> represents a
synchronization point in pipeline mode, requested by
<xref linkend="libpq-PQpipelineSync"/>.
This status occurs only when pipeline mode has been selected.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pgres-pipeline-aborted">
<term><literal>PGRES_PIPELINE_ABORTED</literal></term>
<listitem>
<para>
The <structname>PGresult</structname> represents a pipeline that has
received an error from the server. <function>PQgetResult</function>
must be called repeatedly, and each time it will return this status code
until the end of the current pipeline, at which point it will return
<literal>PGRES_PIPELINE_SYNC</literal> and normal processing can
resume.
</para>
</listitem>
</varlistentry>
</variablelist>
If the result status is <literal>PGRES_TUPLES_OK</literal> or
@ -4677,8 +4704,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
<xref linkend="libpq-PQsendQueryParams"/>,
<xref linkend="libpq-PQsendPrepare"/>,
<xref linkend="libpq-PQsendQueryPrepared"/>,
<xref linkend="libpq-PQsendDescribePrepared"/>, or
<xref linkend="libpq-PQsendDescribePortal"/>
<xref linkend="libpq-PQsendDescribePrepared"/>,
<xref linkend="libpq-PQsendDescribePortal"/>, or
<xref linkend="libpq-PQpipelineSync"/>
call, and returns it.
A null pointer is returned when the command is complete and there
will be no more results.
@ -4702,6 +4730,19 @@ PGresult *PQgetResult(PGconn *conn);
<xref linkend="libpq-PQconsumeInput"/>.
</para>
<para>
In pipeline mode, <function>PQgetResult</function> will return normally
unless an error occurs; for any subsequent query sent after the one
that caused the error until (and excluding) the next synchronization point,
a special result of type <literal>PGRES_PIPELINE_ABORTED</literal> will
be returned, and a null pointer will be returned after it.
When the pipeline synchronization point is reached, a result of type
<literal>PGRES_PIPELINE_SYNC</literal> will be returned.
The result of the next query after the synchronization point follows
immediately (that is, no null pointer is returned after
the synchronization point.)
</para>
<note>
<para>
Even when <xref linkend="libpq-PQresultStatus"/> indicates a fatal
@ -4926,6 +4967,476 @@ int PQflush(PGconn *conn);
</sect1>
<sect1 id="libpq-pipeline-mode">
<title>Pipeline Mode</title>
<indexterm zone="libpq-pipeline-mode">
<primary>libpq</primary>
<secondary>pipeline mode</secondary>
</indexterm>
<indexterm zone="libpq-pipeline-mode">
<primary>pipelining</primary>
<secondary>in libpq</secondary>
</indexterm>
<indexterm zone="libpq-pipeline-mode">
<primary>batch mode</primary>
<secondary>in libpq</secondary>
</indexterm>
<para>
<application>libpq</application> pipeline mode allows applications to
send a query without having to read the result of the previously
sent query. Taking advantage of the pipeline mode, a client will wait
less for the server, since multiple queries/results can be
sent/received in a single network transaction.
</para>
<para>
While pipeline mode provides a significant performance boost, writing
clients using the pipeline mode is more complex because it involves
managing a queue of pending queries and finding which result
corresponds to which query in the queue.
</para>
<para>
Pipeline mode also generally consumes more memory on both the client and server,
though careful and aggressive management of the send/receive queue can mitigate
this. This applies whether or not the connection is in blocking or non-blocking
mode.
</para>
<para>
While the pipeline API was introduced in
<productname>PostgreSQL</productname> 14, it is a client-side feature
which doesn't require special server support, and works on any server
that supports the v3 extended query protocol.
</para>
<sect2 id="libpq-pipeline-using">
<title>Using Pipeline Mode</title>
<para>
To issue pipelines, the application must switch the connection
into pipeline mode,
which is done with <xref linkend="libpq-PQenterPipelineMode"/>.
<xref linkend="libpq-PQpipelineStatus"/> can be used
to test whether pipeline mode is active.
In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link>
are permitted, and <literal>COPY</literal> is disallowed.
Using synchronous command execution functions
such as <function>PQfn</function>,
<function>PQexec</function>,
<function>PQexecParams</function>,
<function>PQprepare</function>,
<function>PQexecPrepared</function>,
<function>PQdescribePrepared</function>,
<function>PQdescribePortal</function>,
is an error condition.
Once all dispatched commands have had their results processed, and
the end pipeline result has been consumed, the application may return
to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>.
</para>
<note>
<para>
It is best to use pipeline mode with <application>libpq</application> in
<link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
in blocking mode it is possible for a client/server deadlock to occur.
<footnote>
<para>
The client will block trying to send queries to the server, but the
server will block trying to send results to the client from queries
it has already processed. This only occurs when the client sends
enough queries to fill both its output buffer and the server's receive
buffer before it switches to processing input from the server,
but it's hard to predict exactly when that will happen.
</para>
</footnote>
</para>
</note>
<sect3 id="libpq-pipeline-sending">
<title>Issuing Queries</title>
<para>
After entering pipeline mode, the application dispatches requests using
<xref linkend="libpq-PQsendQuery"/>,
<xref linkend="libpq-PQsendQueryParams"/>,
or its prepared-query sibling
<xref linkend="libpq-PQsendQueryPrepared"/>.
These requests are queued on the client-side until flushed to the server;
this occurs when <xref linkend="libpq-PQpipelineSync"/> is used to
establish a synchronization point in the pipeline,
or when <xref linkend="libpq-PQflush"/> is called.
The functions <xref linkend="libpq-PQsendPrepare"/>,
<xref linkend="libpq-PQsendDescribePrepared"/>, and
<xref linkend="libpq-PQsendDescribePortal"/> also work in pipeline mode.
Result processing is described below.
</para>
<para>
The server executes statements, and returns results, in the order the
client sends them. The server will begin executing the commands in the
pipeline immediately, not waiting for the end of the pipeline.
If any statement encounters an error, the server aborts the current
transaction and does not execute any subsequent command in the queue
until the next synchronization point established by
<function>PQpipelineSync</function>;
a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
each such command.
(This remains true even if the commands in the pipeline would rollback
the transaction.)
Query processing resumes after the synchronization point.
</para>
<para>
It's fine for one operation to depend on the results of a
prior one; for example, one query may define a table that the next
query in the same pipeline uses. Similarly, an application may
create a named prepared statement and execute it with later
statements in the same pipeline.
</para>
</sect3>
<sect3 id="libpq-pipeline-results">
<title>Processing Results</title>
<para>
To process the result of one query in a pipeline, the application calls
<function>PQgetResult</function> repeatedly and handles each result
until <function>PQgetResult</function> returns null.
The result from the next query in the pipeline may then be retrieved using
<function>PQgetResult</function> again and the cycle repeated.
The application handles individual statement results as normal.
When the results of all the queries in the pipeline have been
returned, <function>PQgetResult</function> returns a result
containing the status value <literal>PGRES_PIPELINE_SYNC</literal>
</para>
<para>
The client may choose to defer result processing until the complete
pipeline has been sent, or interleave that with sending further
queries in the pipeline; see <xref linkend="libpq-pipeline-interleave"/>.
</para>
<para>
To enter single-row mode, call <function>PQsetSingleRowMode</function>
before retrieving results with <function>PQgetResult</function>.
This mode selection is effective only for the query currently
being processed. For more information on the use of
<function>PQsetSingleRowMode</function>,
refer to <xref linkend="libpq-single-row-mode"/>.
</para>
<para>
<function>PQgetResult</function> behaves the same as for normal
asynchronous processing except that it may contain the new
<type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
and <literal>PGRES_PIPELINE_ABORTED</literal>.
<literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
<function>PQpipelineSync</function> at the corresponding point
in the pipeline.
<literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
query result for the first error and all subsequent results
until the next <literal>PGRES_PIPELINE_SYNC</literal>;
see <xref linkend="libpq-pipeline-errors"/>.
</para>
<para>
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
operate as normal when processing pipeline results.
</para>
<para>
<application>libpq</application> does not provide any information to the
application about the query currently being processed (except that
<function>PQgetResult</function> returns null to indicate that we start
returning the results of next query). The application must keep track
of the order in which it sent queries, to associate them with their
corresponding results.
Applications will typically use a state machine or a FIFO queue for this.
</para>
</sect3>
<sect3 id="libpq-pipeline-errors">
<title>Error Handling</title>
<para>
From the client's perspective, after <function>PQresultStatus</function>
returns <literal>PGRES_FATAL_ERROR</literal>,
the pipeline is flagged as aborted.
<function>PQresultStatus</function> will report a
<literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
operation in an aborted pipeline. The result for
<function>PQpipelineSync</function> is reported as
<literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
and resumption of normal result processing.
</para>
<para>
The client <emphasis>must</emphasis> process results with
<function>PQgetResult</function> during error recovery.
</para>
<para>
If the pipeline used an implicit transaction, then operations that have
already executed are rolled back and operations that were queued to follow
the failed operation are skipped entirely. The same behavior holds if the
pipeline starts and commits a single explicit transaction (i.e. the first
statement is <literal>BEGIN</literal> and the last is
<literal>COMMIT</literal>) except that the session remains in an aborted
transaction state at the end of the pipeline. If a pipeline contains
<emphasis>multiple explicit transactions</emphasis>, all transactions that
committed prior to the error remain committed, the currently in-progress
transaction is aborted, and all subsequent operations are skipped completely,
including subsequent transactions. If a pipeline synchronization point
occurs with an explicit transaction block in aborted state, the next pipeline
will become aborted immediately unless the next command puts the transaction
in normal mode with <command>ROLLBACK</command>.
</para>
<note>
<para>
The client must not assume that work is committed when it
<emphasis>sends</emphasis> a <literal>COMMIT</literal> &mdash; only when the
corresponding result is received to confirm the commit is complete.
Because errors arrive asynchronously, the application needs to be able to
restart from the last <emphasis>received</emphasis> committed change and
resend work done after that point if something goes wrong.
</para>
</note>
</sect3>
<sect3 id="libpq-pipeline-interleave">
<title>Interleaving Result Processing and Query Dispatch</title>
<para>
To avoid deadlocks on large pipelines the client should be structured
around a non-blocking event loop using operating system facilities
such as <function>select</function>, <function>poll</function>,
<function>WaitForMultipleObjectEx</function>, etc.
</para>
<para>
The client application should generally maintain a queue of work
remaining to be dispatched and a queue of work that has been dispatched
but not yet had its results processed. When the socket is writable
it should dispatch more work. When the socket is readable it should
read results and process them, matching them up to the next entry in
its corresponding results queue. Based on available memory, results from the
socket should be read frequently: there's no need to wait until the
pipeline end to read the results. Pipelines should be scoped to logical
units of work, usually (but not necessarily) one transaction per pipeline.
There's no need to exit pipeline mode and re-enter it between pipelines,
or to wait for one pipeline to finish before sending the next.
</para>
<para>
An example using <function>select()</function> and a simple state
machine to track sent and received work is in
<filename>src/test/modules/libpq_pipeline/libpq_pipeline.c</filename>
in the PostgreSQL source distribution.
</para>
</sect3>
</sect2>
<sect2 id="libpq-pipeline-functions">
<title>Functions Associated with Pipeline Mode</title>
<variablelist>
<varlistentry id="libpq-PQpipelineStatus">
<term><function>PQpipelineStatus</function><indexterm><primary>PQpipelineStatus</primary></indexterm></term>
<listitem>
<para>
Returns the current pipeline mode status of the
<application>libpq</application> connection.
<synopsis>
PGpipelineStatus PQpipelineStatus(const PGconn *conn);
</synopsis>
</para>
<para>
<function>PQpipelineStatus</function> can return one of the following values:
<variablelist>
<varlistentry>
<term>
<literal>PQ_PIPELINE_ON</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is in
pipeline mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<literal>PQ_PIPELINE_OFF</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is
<emphasis>not</emphasis> in pipeline mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<literal>PQ_PIPELINE_ABORTED</literal>
</term>
<listitem>
<para>
The <application>libpq</application> connection is in pipeline
mode and an error occurred while processing the current pipeline.
The aborted flag is cleared when <function>PQgetResult</function>
returns a result of type <literal>PGRES_PIPELINE_SYNC</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQenterPipelineMode">
<term><function>PQenterPipelineMode</function><indexterm><primary>PQenterPipelineMode</primary></indexterm></term>
<listitem>
<para>
Causes a connection to enter pipeline mode if it is currently idle or
already in pipeline mode.
<synopsis>
int PQenterPipelineMode(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success.
Returns 0 and has no effect if the connection is not currently
idle, i.e., it has a result ready, or it is waiting for more
input from the server, etc.
This function does not actually send anything to the server,
it just changes the <application>libpq</application> connection
state.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQexitPipelineMode">
<term><function>PQexitPipelineMode</function><indexterm><primary>PQexitPipelineMode</primary></indexterm></term>
<listitem>
<para>
Causes a connection to exit pipeline mode if it is currently in pipeline mode
with an empty queue and no pending results.
<synopsis>
int PQexitPipelineMode(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success. Returns 1 and takes no action if not in
pipeline mode. If the current statement isn't finished processing,
or <function>PQgetResult</function> has not been called to collect
results from all previously sent query, returns 0 (in which case,
use <xref linkend="libpq-PQerrorMessage"/> to get more information
about the failure).
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-PQpipelineSync">
<term><function>PQpipelineSync</function><indexterm><primary>PQpipelineSync</primary></indexterm></term>
<listitem>
<para>
Marks a synchronization point in a pipeline by sending a
<link linkend="protocol-flow-ext-query">sync message</link>
and flushing the send buffer. This serves as
the delimiter of an implicit transaction and an error recovery
point; see <xref linkend="libpq-pipeline-errors"/>.
<synopsis>
int PQpipelineSync(PGconn *conn);
</synopsis>
</para>
<para>
Returns 1 for success. Returns 0 if the connection is not in
pipeline mode or sending a
<link linkend="protocol-flow-ext-query">sync message</link>
failed.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
<sect2 id="libpq-pipeline-tips">
<title>When to Use Pipeline Mode</title>
<para>
Much like asynchronous query mode, there is no meaningful performance
overhead when using pipeline mode. It increases client application complexity,
and extra caution is required to prevent client/server deadlocks, but
pipeline mode can offer considerable performance improvements, in exchange for
increased memory usage from leaving state around longer.
</para>
<para>
Pipeline mode is most useful when the server is distant, i.e., network latency
(<quote>ping time</quote>) is high, and also when many small operations
are being performed in rapid succession. There is usually less benefit
in using pipelined commands when each query takes many multiples of the client/server
round-trip time to execute. A 100-statement operation run on a server
300ms round-trip-time away would take 30 seconds in network latency alone
without pipelining; with pipelining it may spend as little as 0.3s waiting for
results from the server.
</para>
<para>
Use pipelined commands when your application does lots of small
<literal>INSERT</literal>, <literal>UPDATE</literal> and
<literal>DELETE</literal> operations that can't easily be transformed
into operations on sets, or into a <literal>COPY</literal> operation.
</para>
<para>
Pipeline mode is not useful when information from one operation is required by
the client to produce the next operation. In such cases, the client
would have to introduce a synchronization point and wait for a full client/server
round-trip to get the results it needs. However, it's often possible to
adjust the client design to exchange the required information server-side.
Read-modify-write cycles are especially good candidates; for example:
<programlisting>
BEGIN;
SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
-- result: x=2
-- client adds 1 to x:
UPDATE mytable SET x = 3 WHERE id = 42;
COMMIT;
</programlisting>
could be much more efficiently done with:
<programlisting>
UPDATE mytable SET x = x + 1 WHERE id = 42;
</programlisting>
</para>
<para>
Pipelining is less useful, and more complex, when a single pipeline contains
multiple transactions (see <xref linkend="libpq-pipeline-errors"/>).
</para>
</sect2>
</sect1>
<sect1 id="libpq-single-row-mode">
<title>Retrieving Query Results Row-by-Row</title>
@ -4966,6 +5477,13 @@ int PQflush(PGconn *conn);
Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
</para>
<para>
When using pipeline mode, single-row mode needs to be activated for each
query in the pipeline before retrieving results for that query
with <function>PQgetResult</function>.
See <xref linkend="libpq-pipeline-mode"/> for more information.
</para>
<para>
<variablelist>
<varlistentry id="libpq-PQsetSingleRowMode">

View File

@ -130,6 +130,10 @@
<application>libpq</application> library.
</para>
<para>
Client applications cannot use these functions while a libpq connection is in pipeline mode.
</para>
<sect2 id="lo-create">
<title>Creating a Large Object</title>

View File

@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
walres->err = _("empty query");
break;
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
walres->status = WALRCV_ERROR;
walres->err = _("unexpected pipeline mode");
break;
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_BAD_RESPONSE:

View File

@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return false;
}
return true;

View File

@ -179,3 +179,7 @@ PQgetgssctx 176
PQsetSSLKeyPassHook_OpenSSL 177
PQgetSSLKeyPassHook_OpenSSL 178
PQdefaultSSLKeyPassHook_OpenSSL 179
PQenterPipelineMode 180
PQexitPipelineMode 181
PQpipelineSync 182
PQpipelineStatus 183

View File

@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
}
}
/*
* pqFreeCommandQueue
* Free all the entries of PGcmdQueueEntry queue passed.
*/
static void
pqFreeCommandQueue(PGcmdQueueEntry *queue)
{
while (queue != NULL)
{
PGcmdQueueEntry *cur = queue;
queue = cur->next;
if (cur->query)
free(cur->query);
free(cur);
}
}
/*
* pqDropServerData
@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
}
conn->notifyHead = conn->notifyTail = NULL;
pqFreeCommandQueue(conn->cmd_queue_head);
conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
pqFreeCommandQueue(conn->cmd_queue_recycle);
conn->cmd_queue_recycle = NULL;
/* Reset ParameterStatus data, as well as variables deduced from it */
pstatus = conn->pstatus;
while (pstatus != NULL)
@ -2459,6 +2482,7 @@ keep_going: /* We will come back to here until there is
/* Drop any PGresult we might have, too */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn);
/* Reset conn->status to put the state machine in the right state */
@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
conn->status = CONNECTION_BAD;
conn->asyncStatus = PGASYNC_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
conn->xactStatus = PQTRANS_IDLE;
conn->options_valid = false;
conn->nonblocking = false;
@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
if (conn->connip)
free(conn->connip);
/* Note that conn->Pfdebug is not ours to close or free */
if (conn->last_query)
free(conn->last_query);
if (conn->write_err_msg)
free(conn->write_err_msg);
if (conn->inBuffer)
@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn); /* deallocate result */
resetPQExpBuffer(&conn->errorMessage);
release_conn_addrinfo(conn);
@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
return conn->be_pid;
}
PGpipelineStatus
PQpipelineStatus(const PGconn *conn)
{
if (!conn)
return PQ_PIPELINE_OFF;
return conn->pipelineStatus;
}
int
PQconnectionNeedsPassword(const PGconn *conn)
{

View File

@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
"PGRES_SINGLE_TUPLE"
"PGRES_SINGLE_TUPLE",
"PGRES_PIPELINE_SYNC",
"PGRES_PIPELINE_ABORTED"
};
/*
@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
static void pqPipelineProcessQueue(PGconn *conn);
static int pqPipelineFlush(PGconn *conn);
/* ----------------
@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
conn->asyncStatus = PGASYNC_READY;
conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
@ -1184,6 +1188,87 @@ fail:
}
/*
* pqAllocCmdQueueEntry
* Get a command queue entry for caller to fill.
*
* If the recycle queue has a free element, that is returned; if not, a
* fresh one is allocated. Caller is responsible for adding it to the
* command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
* releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
*
* If allocation fails, sets the error message and returns NULL.
*/
static PGcmdQueueEntry *
pqAllocCmdQueueEntry(PGconn *conn)
{
PGcmdQueueEntry *entry;
if (conn->cmd_queue_recycle == NULL)
{
entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
if (entry == NULL)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory\n"));
return NULL;
}
}
else
{
entry = conn->cmd_queue_recycle;
conn->cmd_queue_recycle = entry->next;
}
entry->next = NULL;
entry->query = NULL;
return entry;
}
/*
* pqAppendCmdQueueEntry
* Append a caller-allocated command queue entry to the queue.
*
* The query itself must already have been put in the output buffer by the
* caller.
*/
static void
pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
{
Assert(entry->next == NULL);
if (conn->cmd_queue_head == NULL)
conn->cmd_queue_head = entry;
else
conn->cmd_queue_tail->next = entry;
conn->cmd_queue_tail = entry;
}
/*
* pqRecycleCmdQueueEntry
* Push a command queue entry onto the freelist.
*/
static void
pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
{
if (entry == NULL)
return;
/* recyclable entries should not have a follow-on command */
Assert(entry->next == NULL);
if (entry->query)
{
free(entry->query);
entry->query = NULL;
}
entry->next = conn->cmd_queue_recycle;
conn->cmd_queue_recycle = entry;
}
/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* check the argument */
if (!query)
{
@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
return 0;
}
/* construct the outgoing Query message */
if (pqPutMsgStart('Q', conn) < 0 ||
pqPuts(query, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
/* error message should be set up already */
return 0;
/* construct the outgoing Query message */
if (pqPutMsgStart('Q', conn) < 0 ||
pqPuts(query, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
{
/* error message should be set up already */
return 0;
}
/* remember we are using simple query protocol */
entry->queryclass = PGQUERY_SIMPLE;
/* and remember the query text too, if possible */
entry->query = strdup(query);
}
else
{
/*
* In pipeline mode we cannot use the simple protocol, so we send
* Parse, Bind, Describe Portal, Execute.
*/
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPuts(query, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('B', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutInt(0, 2, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc('P', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
if (pqPutMsgStart('E', conn) < 0 ||
pqPuts("", conn) < 0 ||
pqPutInt(0, 4, conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* remember we are using simple query protocol */
conn->queryclass = PGQUERY_SIMPLE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
conn->last_query = strdup(query);
entry->queryclass = PGQUERY_EXTENDED;
entry->query = strdup(query);
}
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
{
/* error message should be set up already */
return 0;
}
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
conn->asyncStatus = PGASYNC_BUSY;
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
/*
@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
PGcmdQueueEntry *entry = NULL;
if (!PQsendQueryStart(conn, true))
return 0;
@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
return 0;
}
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* construct the Parse message */
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts(stmtName, conn) < 0 ||
@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
if (pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* construct the Sync message */
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* Add a Sync, unless in pipeline mode. */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are doing just a Parse */
conn->queryclass = PGQUERY_PREPARE;
entry->queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
conn->last_query = strdup(query);
/* if insufficient memory, query just winds up NULL */
entry->query = strdup(query);
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
* Common startup code for PQsendQuery and sibling routines
* PQsendQueryStart
* Common startup code for PQsendQuery and sibling routines
*/
static bool
PQsendQueryStart(PGconn *conn, bool newQuery)
@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
libpq_gettext("no connection to the server\n"));
return false;
}
/* Can't send while already busy, either. */
if (conn->asyncStatus != PGASYNC_IDLE)
/* Can't send while already busy, either, unless enqueuing for later */
if (conn->asyncStatus != PGASYNC_IDLE &&
conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
/* initialize async result-accumulation state */
pqClearAsyncResult(conn);
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* When enqueuing commands we don't change much of the connection
* state since it's already in use for the current command. The
* connection state will get updated when pqPipelineProcessQueue()
* advances to start processing the queued message.
*
* Just make sure we can safely enqueue given the current connection
* state. We can enqueue behind another queue item, or behind a
* non-queue command (one that sends its own sync), but we can't
* enqueue if the connection is in a copy state.
*/
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot queue commands during COPY\n"));
return false;
}
}
else
{
/*
* This command's results will come in immediately. Initialize async
* result-accumulation state
*/
pqClearAsyncResult(conn);
/* reset single-row processing mode */
conn->singleRowMode = false;
/* reset single-row processing mode */
conn->singleRowMode = false;
}
/* ready to send command message */
return true;
}
@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
PGcmdQueueEntry *entry;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/*
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
* using specified statement name and the unnamed portal.
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
* (if not in pipeline mode), using specified statement name and the
* unnamed portal.
*/
if (command)
@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* construct the Sync message */
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* construct the Sync message if not in pipeline mode */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are using extended query protocol */
conn->queryclass = PGQUERY_EXTENDED;
entry->queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if (conn->last_query)
free(conn->last_query);
/* if insufficient memory, query just winds up NULL */
if (command)
conn->last_query = strdup(command);
else
conn->last_query = NULL;
entry->query = strdup(command);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
conn->asyncStatus = PGASYNC_BUSY;
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
return 0;
if (conn->asyncStatus != PGASYNC_BUSY)
return 0;
if (conn->queryclass != PGQUERY_SIMPLE &&
conn->queryclass != PGQUERY_EXTENDED)
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return 0;
if (conn->result)
return 0;
@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
}
/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
*
* In pipeline mode, once all the result of a query have been returned,
* PQgetResult returns NULL to let the user know that the next
* query is being processed. At the end of the pipeline, returns a
* result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
*/
PGresult *
PQgetResult(PGconn *conn)
{
@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to return the NULL that terminates the round of
* results from the current query; prepare to send the results
* of the next query when we're called next. Also, since this
* is the start of the results of the next query, clear any
* prior error message.
*/
resetPQExpBuffer(&conn->errorMessage);
pqPipelineProcessQueue(conn);
}
break;
case PGASYNC_READY:
/*
* For any query type other than simple query protocol, we advance
* the command queue here. This is because for simple query
* protocol we can get the READY state multiple times before the
* command is actually complete, since the command string can
* contain many queries. In simple query protocol, the queue
* advance is done by fe-protocol3 when it receives ReadyForQuery.
*/
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
pqCommandQueueAdvance(conn);
res = pqPrepareAsyncResult(conn);
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
* move queue processing forwards immediately, so that next
* time we're called, we're prepared to return the next result
* received from the server. In all other cases, leave the
* queue state change for next time, so that a terminating
* NULL result is sent.
*
* (In other words: we don't return a NULL after a pipeline
* sync.)
*/
if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
pqPipelineProcessQueue(conn);
}
else
{
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
}
break;
case PGASYNC_READY_MORE:
res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
return false;
}
/*
* Since this is the beginning of a query cycle, reset the error buffer.
*/
@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
PGcmdQueueEntry *entry = NULL;
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
if (!PQsendQueryStart(conn, true))
return 0;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
/* construct the Describe message */
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
/* remember we are doing a Describe */
conn->queryclass = PGQUERY_DESCRIBE;
/* reset last_query string (not relevant now) */
if (conn->last_query)
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
free(conn->last_query);
conn->last_query = NULL;
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
}
/* remember we are doing a Describe */
entry->queryclass = PGQUERY_DESCRIBE;
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
* it all; PQgetResult() will do any additional flushing needed.
*/
if (pqFlush(conn) < 0)
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
conn->asyncStatus = PGASYNC_BUSY;
pqAppendCmdQueueEntry(conn, entry);
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
if (conn->queryclass != PGQUERY_SIMPLE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
*/
resetPQExpBuffer(&conn->errorMessage);
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("PQfn not allowed in pipeline mode\n"));
return NULL;
}
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
conn->result != NULL)
{
@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
args, nargs);
}
/* ====== Pipeline mode support ======== */
/*
* PQenterPipelineMode
* Put an idle connection in pipeline mode.
*
* Returns 1 on success. On failure, errorMessage is set and 0 is returned.
*
* Commands submitted after this can be pipelined on the connection;
* there's no requirement to wait for one to finish before the next is
* dispatched.
*
* Queuing of a new query or syncing during COPY is not allowed.
*
* A set of commands is terminated by a PQpipelineSync. Multiple sync
* points can be established while in pipeline mode. Pipeline mode can
* be exited by calling PQexitPipelineMode() once all results are processed.
*
* This doesn't actually send anything on the wire, it just puts libpq
* into a state where it can pipeline work.
*/
int
PQenterPipelineMode(PGconn *conn)
{
if (!conn)
return 0;
/* succeed with no action if already in pipeline mode */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
return 1;
if (conn->asyncStatus != PGASYNC_IDLE)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
return 0;
}
conn->pipelineStatus = PQ_PIPELINE_ON;
return 1;
}
/*
* PQexitPipelineMode
* End pipeline mode and return to normal command mode.
*
* Returns 1 in success (pipeline mode successfully ended, or not in pipeline
* mode).
*
* Returns 0 if in pipeline mode and cannot be ended yet. Error message will
* be set.
*/
int
PQexitPipelineMode(PGconn *conn)
{
if (!conn)
return 0;
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
return 1;
switch (conn->asyncStatus)
{
case PGASYNC_READY:
case PGASYNC_READY_MORE:
/* there are some uncollected results */
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
return 0;
case PGASYNC_BUSY:
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
default:
/* OK */
break;
}
/* still work to process */
if (conn->cmd_queue_head != NULL)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
return 0;
}
conn->pipelineStatus = PQ_PIPELINE_OFF;
conn->asyncStatus = PGASYNC_IDLE;
/* Flush any pending data in out buffer */
if (pqFlush(conn) < 0)
return 0; /* error message is setup already */
return 1;
}
/*
* pqCommandQueueAdvance
* Remove one query from the command queue, when we receive
* all results from the server that pertain to it.
*/
void
pqCommandQueueAdvance(PGconn *conn)
{
PGcmdQueueEntry *prevquery;
if (conn->cmd_queue_head == NULL)
return;
/* delink from queue */
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
}
/*
* pqPipelineProcessQueue: subroutine for PQgetResult
* In pipeline mode, start processing the results of the next query in the queue.
*/
void
pqPipelineProcessQueue(PGconn *conn)
{
switch (conn->asyncStatus)
{
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
case PGASYNC_IDLE:
/* next query please */
break;
}
/* Nothing to do if not in pipeline mode, or queue is empty */
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
conn->cmd_queue_head == NULL)
return;
/* Initialize async result-accumulation state */
pqClearAsyncResult(conn);
/*
* Reset single-row processing mode. (Client has to set it up for each
* query, if desired.)
*/
conn->singleRowMode = false;
if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
{
/*
* In an aborted pipeline we don't get anything from the server for
* each result; we're just discarding commands from the queue until we
* get to the next sync from the server.
*
* The PGRES_PIPELINE_ABORTED results tell the client that its queries
* got aborted.
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
if (!conn->result)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory\n"));
pqSaveErrorResult(conn);
return;
}
conn->asyncStatus = PGASYNC_READY;
}
else
{
/* allow parsing to continue */
conn->asyncStatus = PGASYNC_BUSY;
}
}
/*
* PQpipelineSync
* Send a Sync message as part of a pipeline, and flush to server
*
* It's legal to start submitting more commands in the pipeline immediately,
* without waiting for the results of the current pipeline. There's no need to
* end pipeline mode and start it again.
*
* If a command in a pipeline fails, every subsequent command up to and including
* the result to the Sync message sent by PQpipelineSync gets set to
* PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
* error, a PGresult with PGRES_PIPELINE_SYNC is produced.
*
* Queries can already have been sent before PQpipelineSync is called, but
* PQpipelineSync need to be called before retrieving command results.
*
* The connection will remain in pipeline mode and unavailable for new
* synchronous command execution functions until all results from the pipeline
* are processed by the client.
*/
int
PQpipelineSync(PGconn *conn)
{
PGcmdQueueEntry *entry;
if (!conn)
return 0;
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
return 0;
}
switch (conn->asyncStatus)
{
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
/* should be unreachable */
appendPQExpBufferStr(&conn->errorMessage,
"internal error: cannot send pipeline while in COPY\n");
return 0;
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
/* OK to send sync */
break;
}
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
entry->queryclass = PGQUERY_SYNC;
entry->query = NULL;
/* construct the Sync message */
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
goto sendFailed;
pqAppendCmdQueueEntry(conn, entry);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (PQflush(conn) < 0)
goto sendFailed;
/*
* Call pqPipelineProcessQueue so the user can call start calling
* PQgetResult.
*/
pqPipelineProcessQueue(conn);
return 1;
sendFailed:
pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
/* ====== accessor funcs for PGresult ======== */
@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
char *
PQresStatus(ExecStatusType status)
{
if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
if ((unsigned int) status >= lengthof(pgresStatus))
return libpq_gettext("invalid ExecStatusType code");
return pgresStatus[status];
}
@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
return pqFlush(conn);
}
/*
* pqPipelineFlush
*
* In pipeline mode, data will be flushed only when the out buffer reaches the
* threshold value. In non-pipeline mode, it behaves as stock pqFlush.
*
* Returns 0 on success.
*/
static int
pqPipelineFlush(PGconn *conn)
{
if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
(conn->outCount >= OUTBUFFER_THRESHOLD))
return pqFlush(conn);
return 0;
}
/*
* PQfreemem - safely frees memory allocated

View File

@ -158,6 +158,18 @@ pqParseInput3(PGconn *conn)
if (conn->asyncStatus != PGASYNC_IDLE)
return;
/*
* We're also notionally not-IDLE when in pipeline mode the state
* says "idle" (so we have completed receiving the results of one
* query from the server and dispatched them to the application)
* but another query is queued; yield back control to caller so
* that they can initiate processing of the next query in the
* queue.
*/
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
conn->cmd_queue_head != NULL)
return;
/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
}
else
{
/* Any other case is unexpected and we summarily skip it */
pqInternalNotice(&conn->noticeHooks,
"message type 0x%02x arrived from server while idle",
id);
@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
case 'Z': /* backend is ready for new query */
case 'Z': /* sync response, backend is ready for new
* query */
if (getReadyForQuery(conn))
return;
conn->asyncStatus = PGASYNC_IDLE;
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
conn->result = PQmakeEmptyPGresult(conn,
PGRES_PIPELINE_SYNC);
if (!conn->result)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("out of memory"));
pqSaveErrorResult(conn);
}
else
{
conn->pipelineStatus = PQ_PIPELINE_ON;
conn->asyncStatus = PGASYNC_READY;
}
}
else
{
/*
* In simple query protocol, advance the command queue
* (see PQgetResult).
*/
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
pqCommandQueueAdvance(conn);
conn->asyncStatus = PGASYNC_IDLE;
}
break;
case 'I': /* empty query */
if (conn->result == NULL)
@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
if (conn->queryclass == PGQUERY_PREPARE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
conn->inCursor += msgLength;
}
else if (conn->result == NULL ||
conn->queryclass == PGQUERY_DESCRIBE)
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn, msgLength))
@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
* instead of PGRES_TUPLES_OK. Otherwise we can just
* ignore this message.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result == NULL)
{
@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
id, msgLength);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */
/* flush input data since we're giving up on processing it */
pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* No more connection to backend */
@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* PGresult created by getParamDescriptions, and we should fill data into
* that. Otherwise, create a new, empty PGresult.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if (!conn->cmd_queue_head ||
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
if (conn->result)
result = conn->result;
@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
if ((!conn->cmd_queue_head) ||
(conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
conn->asyncStatus = PGASYNC_READY;
return 0;
@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
PQExpBufferData workBuf;
char id;
/* If in pipeline mode, set error indicator for it */
if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
conn->pipelineStatus = PQ_PIPELINE_ABORTED;
/*
* If this is an error message, pre-emptively clear any incomplete query
* result we may have. We'd just throw it away below anyway, and
@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
* might need it for an error cursor display, which is only true if there
* is a PG_DIAG_STATEMENT_POSITION field.
*/
if (have_position && conn->last_query && res)
res->errQuery = pqResultStrdup(res, conn->last_query);
if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
/*
* Now build the "overall" error message for PQresultErrorMessage.
@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
if (conn->queryclass != PGQUERY_SIMPLE)
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
int avail;
int i;
/* already validated by PQfn */
Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
/* PQfn already validated connection state */
if (pqPutMsgStart('F', conn) < 0 || /* function call msg */

View File

@ -96,7 +96,10 @@ typedef enum
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
* earlier in a pipeline */
} ExecStatusType;
typedef enum
@ -136,6 +139,16 @@ typedef enum
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
} PGPing;
/*
* PGpipelineStatus - Current status of pipeline mode
*/
typedef enum
{
PQ_PIPELINE_OFF,
PQ_PIPELINE_ON,
PQ_PIPELINE_ABORTED
} PGpipelineStatus;
/* PGconn encapsulates a connection to the backend.
* The contents of this struct are not supposed to be known to applications.
*/
@ -327,6 +340,7 @@ extern int PQserverVersion(const PGconn *conn);
extern char *PQerrorMessage(const PGconn *conn);
extern int PQsocket(const PGconn *conn);
extern int PQbackendPID(const PGconn *conn);
extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
extern int PQconnectionNeedsPassword(const PGconn *conn);
extern int PQconnectionUsedPassword(const PGconn *conn);
extern int PQclientEncoding(const PGconn *conn);
@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
/* Routines for pipeline mode management */
extern int PQenterPipelineMode(PGconn *conn);
extern int PQexitPipelineMode(PGconn *conn);
extern int PQpipelineSync(PGconn *conn);
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);

View File

@ -217,21 +217,16 @@ typedef enum
{
PGASYNC_IDLE, /* nothing's happening, dude */
PGASYNC_BUSY, /* query in progress */
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_READY, /* query done, waiting for client to fetch
* result */
PGASYNC_READY_MORE, /* query done, waiting for client to fetch
* result, more results expected from this
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
typedef enum
{
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE /* Describe Statement or Portal */
} PGQueryClass;
/* Target server type (decoded value of target_session_attrs) */
typedef enum
{
@ -305,6 +300,29 @@ typedef enum pg_conn_host_type
CHT_UNIX_SOCKET
} pg_conn_host_type;
/*
* PGQueryClass tracks which query protocol is in use for each command queue
* entry, or special operation in execution
*/
typedef enum
{
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
PGQUERY_SYNC /* Sync (at end of a pipeline) */
} PGQueryClass;
/*
* An entry in the pending command queue.
*/
typedef struct PGcmdQueueEntry
{
PGQueryClass queryclass; /* Query type */
char *query; /* SQL command, or NULL if none/unknown/OOM */
struct PGcmdQueueEntry *next; /* list link */
} PGcmdQueueEntry;
/*
* pg_conn_host stores all information about each of possibly several hosts
* mentioned in the connection string. Most fields are derived by splitting
@ -389,12 +407,11 @@ struct pg_conn
ConnStatusType status;
PGAsyncStatusType asyncStatus;
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
PGQueryClass queryclass;
char *last_query; /* last SQL command, or NULL if unknown */
char last_sqlstate[6]; /* last reported SQLSTATE */
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
@ -407,6 +424,19 @@ struct pg_conn
pg_conn_host *connhost; /* details about each named host */
char *connip; /* IP address for current network connection */
/*
* The pending command queue as a singly-linked list. Head is the command
* currently in execution, tail is where new commands are added.
*/
PGcmdQueueEntry *cmd_queue_head;
PGcmdQueueEntry *cmd_queue_tail;
/*
* To save malloc traffic, we don't free entries right away; instead we
* save them in this list for possible reuse.
*/
PGcmdQueueEntry *cmd_queue_recycle;
/* Connection data */
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
* unconnected */
@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
const char *value);
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
extern void pqCommandQueueAdvance(PGconn *conn);
extern int PQsendQueryContinue(PGconn *conn, const char *query);
/* === in fe-protocol3.c === */
@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
*/
#define pqIsnonblocking(conn) ((conn)->nonblocking)
/*
* Connection's outbuffer threshold, for pipeline mode.
*/
#define OUTBUFFER_THRESHOLD 65536
#ifdef ENABLE_NLS
extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);

View File

@ -10,6 +10,7 @@ SUBDIRS = \
delay_execution \
dummy_index_am \
dummy_seclabel \
libpq_pipeline \
plsample \
snapshot_too_old \
test_bloomfilter \

View File

@ -0,0 +1,5 @@
# Generated subdirectories
/log/
/results/
/tmp_check/
/libpq_pipeline

View File

@ -0,0 +1,20 @@
# src/test/modules/libpq_pipeline/Makefile
PROGRAM = libpq_pipeline
OBJS = libpq_pipeline.o
PG_CPPFLAGS = -I$(libpq_srcdir)
PG_LIBS_INTERNAL += $(libpq_pgport)
TAP_TESTS = 1
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/libpq_pipeline
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1 @@
Test programs and libraries for libpq

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
use strict;
use warnings;
use Config;
use PostgresNode;
use TestLib;
use Test::More tests => 8;
use Cwd;
my $node = get_new_node('main');
$node->init;
$node->start;
my $numrows = 10000;
$ENV{PATH} = "$ENV{PATH}:" . getcwd();
my ($out, $err) = run_command(['libpq_pipeline', 'tests']);
die "oops: $err" unless $err eq '';
my @tests = split(/\s/, $out);
for my $testname (@tests)
{
$node->command_ok(
[ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
"libpq_pipeline $testname");
}
$node->stop('fast');

View File

@ -33,10 +33,11 @@ my @unlink_on_exit;
# Set of variables for modules in contrib/ and src/test/modules/
my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
my @contrib_uselibpgport = ('oid2name', 'vacuumlo');
my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
my $contrib_extralibs = undef;
my @contrib_uselibpq =
('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
my @contrib_uselibpgport = ('libpq_pipeline', 'oid2name', 'vacuumlo');
my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
my $contrib_extralibs = { 'libpq_pipeline' => ['ws2_32.lib'] };
my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
my $contrib_extrasource = {
'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],

View File

@ -1563,10 +1563,12 @@ PG_Locale_Strategy
PG_Lock_Status
PG_init_t
PGcancel
PGcmdQueueEntry
PGconn
PGdataValue
PGlobjfuncs
PGnotify
PGpipelineStatus
PGresAttDesc
PGresAttValue
PGresParamDesc