From acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 15 Mar 2021 18:13:42 -0300 Subject: [PATCH] Implement pipeline mode in libpq MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pipeline mode in libpq lets an application avoid the Sync messages in the FE/BE protocol that are implicit in the old libpq API after each query. The application can then insert Sync at its leisure with a new libpq function PQpipelineSync. This can lead to substantial reductions in query latency. Co-authored-by: Craig Ringer Co-authored-by: Matthieu Garrigues Co-authored-by: Álvaro Herrera Reviewed-by: Andres Freund Reviewed-by: Aya Iwata Reviewed-by: Daniel Vérité Reviewed-by: David G. Johnston Reviewed-by: Justin Pryzby Reviewed-by: Kirk Jamison Reviewed-by: Michael Paquier Reviewed-by: Nikhil Sontakke Reviewed-by: Vaishnavi Prabakaran Reviewed-by: Zhihong Yu Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com --- doc/src/sgml/libpq.sgml | 522 ++++++- doc/src/sgml/lobj.sgml | 4 + .../libpqwalreceiver/libpqwalreceiver.c | 6 + src/bin/pg_amcheck/pg_amcheck.c | 2 + src/interfaces/libpq/exports.txt | 4 + src/interfaces/libpq/fe-connect.c | 37 +- src/interfaces/libpq/fe-exec.c | 717 ++++++++- src/interfaces/libpq/fe-protocol3.c | 77 +- src/interfaces/libpq/libpq-fe.h | 21 +- src/interfaces/libpq/libpq-int.h | 60 +- src/test/modules/Makefile | 1 + src/test/modules/libpq_pipeline/.gitignore | 5 + src/test/modules/libpq_pipeline/Makefile | 20 + src/test/modules/libpq_pipeline/README | 1 + .../modules/libpq_pipeline/libpq_pipeline.c | 1303 +++++++++++++++++ .../libpq_pipeline/t/001_libpq_pipeline.pl | 28 + src/tools/msvc/Mkvcbuild.pm | 9 +- src/tools/pgindent/typedefs.list | 2 + 18 files changed, 2706 insertions(+), 113 deletions(-) create mode 100644 src/test/modules/libpq_pipeline/.gitignore create mode 100644 src/test/modules/libpq_pipeline/Makefile create mode 100644 src/test/modules/libpq_pipeline/README create mode 100644 src/test/modules/libpq_pipeline/libpq_pipeline.c create mode 100644 src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 910e9a81ea..be674fbaa9 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res); + + + PGRES_PIPELINE_SYNC + + + The PGresult represents a + synchronization point in pipeline mode, requested by + . + This status occurs only when pipeline mode has been selected. + + + + + + PGRES_PIPELINE_ABORTED + + + The PGresult represents a pipeline that has + received an error from the server. PQgetResult + must be called repeatedly, and each time it will return this status code + until the end of the current pipeline, at which point it will return + PGRES_PIPELINE_SYNC and normal processing can + resume. + + + + If the result status is PGRES_TUPLES_OK or @@ -4677,8 +4704,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName); , , , - , or - + , + , or + call, and returns it. A null pointer is returned when the command is complete and there will be no more results. @@ -4702,6 +4730,19 @@ PGresult *PQgetResult(PGconn *conn); . + + In pipeline mode, PQgetResult will return normally + unless an error occurs; for any subsequent query sent after the one + that caused the error until (and excluding) the next synchronization point, + a special result of type PGRES_PIPELINE_ABORTED will + be returned, and a null pointer will be returned after it. + When the pipeline synchronization point is reached, a result of type + PGRES_PIPELINE_SYNC will be returned. + The result of the next query after the synchronization point follows + immediately (that is, no null pointer is returned after + the synchronization point.) + + Even when indicates a fatal @@ -4926,6 +4967,476 @@ int PQflush(PGconn *conn); + + Pipeline Mode + + + libpq + pipeline mode + + + + pipelining + in libpq + + + + batch mode + in libpq + + + + libpq pipeline mode allows applications to + send a query without having to read the result of the previously + sent query. Taking advantage of the pipeline mode, a client will wait + less for the server, since multiple queries/results can be + sent/received in a single network transaction. + + + + While pipeline mode provides a significant performance boost, writing + clients using the pipeline mode is more complex because it involves + managing a queue of pending queries and finding which result + corresponds to which query in the queue. + + + + Pipeline mode also generally consumes more memory on both the client and server, + though careful and aggressive management of the send/receive queue can mitigate + this. This applies whether or not the connection is in blocking or non-blocking + mode. + + + + While the pipeline API was introduced in + PostgreSQL 14, it is a client-side feature + which doesn't require special server support, and works on any server + that supports the v3 extended query protocol. + + + + Using Pipeline Mode + + + To issue pipelines, the application must switch the connection + into pipeline mode, + which is done with . + can be used + to test whether pipeline mode is active. + In pipeline mode, only asynchronous operations + are permitted, and COPY is disallowed. + Using synchronous command execution functions + such as PQfn, + PQexec, + PQexecParams, + PQprepare, + PQexecPrepared, + PQdescribePrepared, + PQdescribePortal, + is an error condition. + Once all dispatched commands have had their results processed, and + the end pipeline result has been consumed, the application may return + to non-pipelined mode with . + + + + + It is best to use pipeline mode with libpq in + non-blocking mode. If used + in blocking mode it is possible for a client/server deadlock to occur. + + + The client will block trying to send queries to the server, but the + server will block trying to send results to the client from queries + it has already processed. This only occurs when the client sends + enough queries to fill both its output buffer and the server's receive + buffer before it switches to processing input from the server, + but it's hard to predict exactly when that will happen. + + + + + + + Issuing Queries + + + After entering pipeline mode, the application dispatches requests using + , + , + or its prepared-query sibling + . + These requests are queued on the client-side until flushed to the server; + this occurs when is used to + establish a synchronization point in the pipeline, + or when is called. + The functions , + , and + also work in pipeline mode. + Result processing is described below. + + + + The server executes statements, and returns results, in the order the + client sends them. The server will begin executing the commands in the + pipeline immediately, not waiting for the end of the pipeline. + If any statement encounters an error, the server aborts the current + transaction and does not execute any subsequent command in the queue + until the next synchronization point established by + PQpipelineSync; + a PGRES_PIPELINE_ABORTED result is produced for + each such command. + (This remains true even if the commands in the pipeline would rollback + the transaction.) + Query processing resumes after the synchronization point. + + + + It's fine for one operation to depend on the results of a + prior one; for example, one query may define a table that the next + query in the same pipeline uses. Similarly, an application may + create a named prepared statement and execute it with later + statements in the same pipeline. + + + + + Processing Results + + + To process the result of one query in a pipeline, the application calls + PQgetResult repeatedly and handles each result + until PQgetResult returns null. + The result from the next query in the pipeline may then be retrieved using + PQgetResult again and the cycle repeated. + The application handles individual statement results as normal. + When the results of all the queries in the pipeline have been + returned, PQgetResult returns a result + containing the status value PGRES_PIPELINE_SYNC + + + + The client may choose to defer result processing until the complete + pipeline has been sent, or interleave that with sending further + queries in the pipeline; see . + + + + To enter single-row mode, call PQsetSingleRowMode + before retrieving results with PQgetResult. + This mode selection is effective only for the query currently + being processed. For more information on the use of + PQsetSingleRowMode, + refer to . + + + + PQgetResult behaves the same as for normal + asynchronous processing except that it may contain the new + PGresult types PGRES_PIPELINE_SYNC + and PGRES_PIPELINE_ABORTED. + PGRES_PIPELINE_SYNC is reported exactly once for each + PQpipelineSync at the corresponding point + in the pipeline. + PGRES_PIPELINE_ABORTED is emitted in place of a normal + query result for the first error and all subsequent results + until the next PGRES_PIPELINE_SYNC; + see . + + + + PQisBusy, PQconsumeInput, etc + operate as normal when processing pipeline results. + + + + libpq does not provide any information to the + application about the query currently being processed (except that + PQgetResult returns null to indicate that we start + returning the results of next query). The application must keep track + of the order in which it sent queries, to associate them with their + corresponding results. + Applications will typically use a state machine or a FIFO queue for this. + + + + + + Error Handling + + + From the client's perspective, after PQresultStatus + returns PGRES_FATAL_ERROR, + the pipeline is flagged as aborted. + PQresultStatus will report a + PGRES_PIPELINE_ABORTED result for each remaining queued + operation in an aborted pipeline. The result for + PQpipelineSync is reported as + PGRES_PIPELINE_SYNC to signal the end of the aborted pipeline + and resumption of normal result processing. + + + + The client must process results with + PQgetResult during error recovery. + + + + If the pipeline used an implicit transaction, then operations that have + already executed are rolled back and operations that were queued to follow + the failed operation are skipped entirely. The same behavior holds if the + pipeline starts and commits a single explicit transaction (i.e. the first + statement is BEGIN and the last is + COMMIT) except that the session remains in an aborted + transaction state at the end of the pipeline. If a pipeline contains + multiple explicit transactions, all transactions that + committed prior to the error remain committed, the currently in-progress + transaction is aborted, and all subsequent operations are skipped completely, + including subsequent transactions. If a pipeline synchronization point + occurs with an explicit transaction block in aborted state, the next pipeline + will become aborted immediately unless the next command puts the transaction + in normal mode with ROLLBACK. + + + + + The client must not assume that work is committed when it + sends a COMMIT — only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously, the application needs to be able to + restart from the last received committed change and + resend work done after that point if something goes wrong. + + + + + + Interleaving Result Processing and Query Dispatch + + + To avoid deadlocks on large pipelines the client should be structured + around a non-blocking event loop using operating system facilities + such as select, poll, + WaitForMultipleObjectEx, etc. + + + + The client application should generally maintain a queue of work + remaining to be dispatched and a queue of work that has been dispatched + but not yet had its results processed. When the socket is writable + it should dispatch more work. When the socket is readable it should + read results and process them, matching them up to the next entry in + its corresponding results queue. Based on available memory, results from the + socket should be read frequently: there's no need to wait until the + pipeline end to read the results. Pipelines should be scoped to logical + units of work, usually (but not necessarily) one transaction per pipeline. + There's no need to exit pipeline mode and re-enter it between pipelines, + or to wait for one pipeline to finish before sending the next. + + + + An example using select() and a simple state + machine to track sent and received work is in + src/test/modules/libpq_pipeline/libpq_pipeline.c + in the PostgreSQL source distribution. + + + + + + Functions Associated with Pipeline Mode + + + + + PQpipelineStatusPQpipelineStatus + + + + Returns the current pipeline mode status of the + libpq connection. + +PGpipelineStatus PQpipelineStatus(const PGconn *conn); + + + + + PQpipelineStatus can return one of the following values: + + + + + PQ_PIPELINE_ON + + + + The libpq connection is in + pipeline mode. + + + + + + + PQ_PIPELINE_OFF + + + + The libpq connection is + not in pipeline mode. + + + + + + + PQ_PIPELINE_ABORTED + + + + The libpq connection is in pipeline + mode and an error occurred while processing the current pipeline. + The aborted flag is cleared when PQgetResult + returns a result of type PGRES_PIPELINE_SYNC. + + + + + + + + + + + PQenterPipelineModePQenterPipelineMode + + + + Causes a connection to enter pipeline mode if it is currently idle or + already in pipeline mode. + + +int PQenterPipelineMode(PGconn *conn); + + + + + Returns 1 for success. + Returns 0 and has no effect if the connection is not currently + idle, i.e., it has a result ready, or it is waiting for more + input from the server, etc. + This function does not actually send anything to the server, + it just changes the libpq connection + state. + + + + + + PQexitPipelineModePQexitPipelineMode + + + + Causes a connection to exit pipeline mode if it is currently in pipeline mode + with an empty queue and no pending results. + +int PQexitPipelineMode(PGconn *conn); + + + + Returns 1 for success. Returns 1 and takes no action if not in + pipeline mode. If the current statement isn't finished processing, + or PQgetResult has not been called to collect + results from all previously sent query, returns 0 (in which case, + use to get more information + about the failure). + + + + + + PQpipelineSyncPQpipelineSync + + + + Marks a synchronization point in a pipeline by sending a + sync message + and flushing the send buffer. This serves as + the delimiter of an implicit transaction and an error recovery + point; see . + + +int PQpipelineSync(PGconn *conn); + + + + Returns 1 for success. Returns 0 if the connection is not in + pipeline mode or sending a + sync message + failed. + + + + + + + + When to Use Pipeline Mode + + + Much like asynchronous query mode, there is no meaningful performance + overhead when using pipeline mode. It increases client application complexity, + and extra caution is required to prevent client/server deadlocks, but + pipeline mode can offer considerable performance improvements, in exchange for + increased memory usage from leaving state around longer. + + + + Pipeline mode is most useful when the server is distant, i.e., network latency + (ping time) is high, and also when many small operations + are being performed in rapid succession. There is usually less benefit + in using pipelined commands when each query takes many multiples of the client/server + round-trip time to execute. A 100-statement operation run on a server + 300ms round-trip-time away would take 30 seconds in network latency alone + without pipelining; with pipelining it may spend as little as 0.3s waiting for + results from the server. + + + + Use pipelined commands when your application does lots of small + INSERT, UPDATE and + DELETE operations that can't easily be transformed + into operations on sets, or into a COPY operation. + + + + Pipeline mode is not useful when information from one operation is required by + the client to produce the next operation. In such cases, the client + would have to introduce a synchronization point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + +BEGIN; +SELECT x FROM mytable WHERE id = 42 FOR UPDATE; +-- result: x=2 +-- client adds 1 to x: +UPDATE mytable SET x = 3 WHERE id = 42; +COMMIT; + + could be much more efficiently done with: + +UPDATE mytable SET x = x + 1 WHERE id = 42; + + + + + Pipelining is less useful, and more complex, when a single pipeline contains + multiple transactions (see ). + + + + Retrieving Query Results Row-by-Row @@ -4966,6 +5477,13 @@ int PQflush(PGconn *conn); Each object should be freed with as usual. + + When using pipeline mode, single-row mode needs to be activated for each + query in the pipeline before retrieving results for that query + with PQgetResult. + See for more information. + + diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 6d46da42e2..012e44c736 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ libpq library. + + Client applications cannot use these functions while a libpq connection is in pipeline mode. + + Creating a Large Object diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 5272eed9ab..f74378110a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->err = _("empty query"); break; + case PGRES_PIPELINE_SYNC: + case PGRES_PIPELINE_ABORTED: + walres->status = WALRCV_ERROR; + walres->err = _("unexpected pipeline mode"); + break; + case PGRES_NONFATAL_ERROR: case PGRES_FATAL_ERROR: case PGRES_BAD_RESPONSE: diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index 008a75d207..c9d9900693 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -929,6 +929,8 @@ should_processing_continue(PGresult *res) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_PIPELINE_SYNC: + case PGRES_PIPELINE_ABORTED: return false; } return true; diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index bbc1f90481..5c48c14191 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,7 @@ PQgetgssctx 176 PQsetSSLKeyPassHook_OpenSSL 177 PQgetSSLKeyPassHook_OpenSSL 178 PQdefaultSSLKeyPassHook_OpenSSL 179 +PQenterPipelineMode 180 +PQexitPipelineMode 181 +PQpipelineSync 182 +PQpipelineStatus 183 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 4e21057d0f..53b354abb2 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput) } } +/* + * pqFreeCommandQueue + * Free all the entries of PGcmdQueueEntry queue passed. + */ +static void +pqFreeCommandQueue(PGcmdQueueEntry *queue) +{ + while (queue != NULL) + { + PGcmdQueueEntry *cur = queue; + + queue = cur->next; + if (cur->query) + free(cur->query); + free(cur); + } +} /* * pqDropServerData @@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn) } conn->notifyHead = conn->notifyTail = NULL; + pqFreeCommandQueue(conn->cmd_queue_head); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + pqFreeCommandQueue(conn->cmd_queue_recycle); + conn->cmd_queue_recycle = NULL; + /* Reset ParameterStatus data, as well as variables deduced from it */ pstatus = conn->pstatus; while (pstatus != NULL) @@ -2459,6 +2482,7 @@ keep_going: /* We will come back to here until there is /* Drop any PGresult we might have, too */ conn->asyncStatus = PGASYNC_IDLE; conn->xactStatus = PQTRANS_IDLE; + conn->pipelineStatus = PQ_PIPELINE_OFF; pqClearAsyncResult(conn); /* Reset conn->status to put the state machine in the right state */ @@ -3917,6 +3941,7 @@ makeEmptyPGconn(void) conn->status = CONNECTION_BAD; conn->asyncStatus = PGASYNC_IDLE; + conn->pipelineStatus = PQ_PIPELINE_OFF; conn->xactStatus = PQTRANS_IDLE; conn->options_valid = false; conn->nonblocking = false; @@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn) if (conn->connip) free(conn->connip); /* Note that conn->Pfdebug is not ours to close or free */ - if (conn->last_query) - free(conn->last_query); if (conn->write_err_msg) free(conn->write_err_msg); if (conn->inBuffer) @@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn) conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */ conn->asyncStatus = PGASYNC_IDLE; conn->xactStatus = PQTRANS_IDLE; + conn->pipelineStatus = PQ_PIPELINE_OFF; pqClearAsyncResult(conn); /* deallocate result */ resetPQExpBuffer(&conn->errorMessage); release_conn_addrinfo(conn); @@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn) return conn->be_pid; } +PGpipelineStatus +PQpipelineStatus(const PGconn *conn) +{ + if (!conn) + return PQ_PIPELINE_OFF; + + return conn->pipelineStatus; +} + int PQconnectionNeedsPassword(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 9a038043b2..f3443708a6 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_PIPELINE_SYNC", + "PGRES_PIPELINE_ABORTED" }; /* @@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static void pqPipelineProcessQueue(PGconn *conn); +static int pqPipelineFlush(PGconn *conn); /* ---------------- @@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1184,6 +1188,87 @@ fail: } +/* + * pqAllocCmdQueueEntry + * Get a command queue entry for caller to fill. + * + * If the recycle queue has a free element, that is returned; if not, a + * fresh one is allocated. Caller is responsible for adding it to the + * command queue (pqAppendCmdQueueEntry) once the struct is filled in, or + * releasing the memory (pqRecycleCmdQueueEntry) if an error occurs. + * + * If allocation fails, sets the error message and returns NULL. + */ +static PGcmdQueueEntry * +pqAllocCmdQueueEntry(PGconn *conn) +{ + PGcmdQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry)); + if (entry == NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * pqAppendCmdQueueEntry + * Append a caller-allocated command queue entry to the queue. + * + * The query itself must already have been put in the output buffer by the + * caller. + */ +static void +pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) +{ + Assert(entry->next == NULL); + + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + + conn->cmd_queue_tail = entry; +} + +/* + * pqRecycleCmdQueueEntry + * Push a command queue entry onto the freelist. + */ +static void +pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) +{ + if (entry == NULL) + return; + + /* recyclable entries should not have a follow-on command */ + Assert(entry->next == NULL); + + if (entry->query) + { + free(entry->query); + entry->query = NULL; + } + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + + /* * PQsendQuery * Submit a query, but don't wait for it to finish @@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query) static int PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) { + PGcmdQueueEntry *entry = NULL; + if (!PQsendQueryStart(conn, newQuery)) return 0; + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* check the argument */ if (!query) { @@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) return 0; } - /* construct the outgoing Query message */ - if (pqPutMsgStart('Q', conn) < 0 || - pqPuts(query, conn) < 0 || - pqPutMsgEnd(conn) < 0) + /* Send the query message(s) */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) { - /* error message should be set up already */ - return 0; + /* construct the outgoing Query message */ + if (pqPutMsgStart('Q', conn) < 0 || + pqPuts(query, conn) < 0 || + pqPutMsgEnd(conn) < 0) + { + /* error message should be set up already */ + return 0; + } + + /* remember we are using simple query protocol */ + entry->queryclass = PGQUERY_SIMPLE; + /* and remember the query text too, if possible */ + entry->query = strdup(query); } + else + { + /* + * In pipeline mode we cannot use the simple protocol, so we send + * Parse, Bind, Describe Portal, Execute. + */ + if (pqPutMsgStart('P', conn) < 0 || + pqPuts("", conn) < 0 || + pqPuts(query, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('B', conn) < 0 || + pqPuts("", conn) < 0 || + pqPuts("", conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('D', conn) < 0 || + pqPutc('P', conn) < 0 || + pqPuts("", conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('E', conn) < 0 || + pqPuts("", conn) < 0 || + pqPutInt(0, 4, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; - /* remember we are using simple query protocol */ - conn->queryclass = PGQUERY_SIMPLE; - - /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + entry->queryclass = PGQUERY_EXTENDED; + entry->query = strdup(query); + } /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) - { - /* error message should be set up already */ - return 0; - } + if (pqPipelineFlush(conn) < 0) + goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + /* error message should be set up already */ + return 0; } /* @@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcmdQueueEntry *entry = NULL; + if (!PQsendQueryStart(conn, true)) return 0; @@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn, return 0; } + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* construct the Parse message */ if (pqPutMsgStart('P', conn) < 0 || pqPuts(stmtName, conn) < 0 || @@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn, if (pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* Add a Sync, unless in pipeline mode. */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + entry->queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + /* if insufficient memory, query just winds up NULL */ + entry->query = strdup(query); + + pqAppendCmdQueueEntry(conn, entry); + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; - /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn, bool newQuery) @@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && + conn->pipelineStatus == PQ_PIPELINE_OFF) { appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * When enqueuing commands we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when pqPipelineProcessQueue() + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + } + } + else + { + /* + * This command's results will come in immediately. Initialize async + * result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcmdQueueEntry *entry; + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ /* - * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, - * using specified statement name and the unnamed portal. + * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync + * (if not in pipeline mode), using specified statement name and the + * unnamed portal. */ if (command) @@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* construct the Sync message if not in pipeline mode */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + entry->queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + /* if insufficient memory, query just winds up NULL */ if (command) - conn->last_query = strdup(command); - else - conn->last_query = NULL; + entry->query = strdup(command); /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn) return 0; if (conn->asyncStatus != PGASYNC_BUSY) return 0; - if (conn->queryclass != PGQUERY_SIMPLE && - conn->queryclass != PGQUERY_EXTENDED) + if (!conn->cmd_queue_head || + (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && + conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) return 0; if (conn->result) return 0; @@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed; } - /* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of * memory). + * + * In pipeline mode, once all the result of a query have been returned, + * PQgetResult returns NULL to let the user know that the next + * query is being processed. At the end of the pipeline, returns a + * result with PQresultStatus(result) == PGRES_PIPELINE_SYNC. */ - PGresult * PQgetResult(PGconn *conn) { @@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query when we're called next. Also, since this + * is the start of the results of the next query, clear any + * prior error message. + */ + resetPQExpBuffer(&conn->errorMessage); + pqPipelineProcessQueue(conn); + } break; case PGASYNC_READY: + + /* + * For any query type other than simple query protocol, we advance + * the command queue here. This is because for simple query + * protocol we can get the READY state multiple times before the + * command is actually complete, since the command string can + * contain many queries. In simple query protocol, the queue + * advance is done by fe-protocol3 when it receives ReadyForQuery. + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) + pqCommandQueueAdvance(conn); + res = pqPrepareAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * We're about to send the results of the current query. Set + * us idle now, and ... + */ + conn->asyncStatus = PGASYNC_IDLE; + + /* + * ... in cases when we're sending a pipeline-sync result, + * move queue processing forwards immediately, so that next + * time we're called, we're prepared to return the next result + * received from the server. In all other cases, leave the + * queue state change for next time, so that a terminating + * NULL result is sent. + * + * (In other words: we don't return a NULL after a pipeline + * sync.) + */ + if (res && res->resultStatus == PGRES_PIPELINE_SYNC) + pqPipelineProcessQueue(conn); + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; @@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n")); + return false; + } + /* * Since this is the beginning of a query cycle, reset the error buffer. */ @@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcmdQueueEntry *entry = NULL; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) if (!PQsendQueryStart(conn, true)) return 0; + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* construct the Describe message */ if (pqPutMsgStart('D', conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; - - /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; - - /* reset last_query string (not relevant now) */ - if (conn->last_query) + if (conn->pipelineStatus == PQ_PIPELINE_OFF) { - free(conn->last_query); - conn->last_query = NULL; + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; } + /* remember we are doing a Describe */ + entry->queryclass = PGQUERY_DESCRIBE; + /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -2541,6 +2801,13 @@ PQfn(PGconn *conn, */ resetPQExpBuffer(&conn->errorMessage); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("PQfn not allowed in pipeline mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -2555,6 +2822,277 @@ PQfn(PGconn *conn, args, nargs); } +/* ====== Pipeline mode support ======== */ + +/* + * PQenterPipelineMode + * Put an idle connection in pipeline mode. + * + * Returns 1 on success. On failure, errorMessage is set and 0 is returned. + * + * Commands submitted after this can be pipelined on the connection; + * there's no requirement to wait for one to finish before the next is + * dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQpipelineSync. Multiple sync + * points can be established while in pipeline mode. Pipeline mode can + * be exited by calling PQexitPipelineMode() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + /* succeed with no action if already in pipeline mode */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot enter pipeline mode, connection not idle\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_ON; + + return 1; +} + +/* + * PQexitPipelineMode + * End pipeline mode and return to normal command mode. + * + * Returns 1 in success (pipeline mode successfully ended, or not in pipeline + * mode). + * + * Returns 0 if in pipeline mode and cannot be ended yet. Error message will + * be set. + */ +int +PQexitPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + /* there are some uncollected results */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode with uncollected results\n")); + return 0; + + case PGASYNC_BUSY: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode while busy\n")); + return 0; + + default: + /* OK */ + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode with uncollected results\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + return 0; /* error message is setup already */ + return 1; +} + +/* + * pqCommandQueueAdvance + * Remove one query from the command queue, when we receive + * all results from the server that pertain to it. + */ +void +pqCommandQueueAdvance(PGconn *conn) +{ + PGcmdQueueEntry *prevquery; + + if (conn->cmd_queue_head == NULL) + return; + + /* delink from queue */ + prevquery = conn->cmd_queue_head; + conn->cmd_queue_head = conn->cmd_queue_head->next; + + /* and make it recyclable */ + prevquery->next = NULL; + pqRecycleCmdQueueEntry(conn, prevquery); +} + +/* + * pqPipelineProcessQueue: subroutine for PQgetResult + * In pipeline mode, start processing the results of the next query in the queue. + */ +void +pqPipelineProcessQueue(PGconn *conn) +{ + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return; + case PGASYNC_IDLE: + /* next query please */ + break; + } + + /* Nothing to do if not in pipeline mode, or queue is empty */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF || + conn->cmd_queue_head == NULL) + return; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* + * Reset single-row processing mode. (Client has to set it up for each + * query, if desired.) + */ + conn->singleRowMode = false; + + if (conn->pipelineStatus == PQ_PIPELINE_ABORTED && + conn->cmd_queue_head->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted pipeline we don't get anything from the server for + * each result; we're just discarding commands from the queue until we + * get to the next sync from the server. + * + * The PGRES_PIPELINE_ABORTED results tell the client that its queries + * got aborted. + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + pqSaveErrorResult(conn); + return; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } +} + +/* + * PQpipelineSync + * Send a Sync message as part of a pipeline, and flush to server + * + * It's legal to start submitting more commands in the pipeline immediately, + * without waiting for the results of the current pipeline. There's no need to + * end pipeline mode and start it again. + * + * If a command in a pipeline fails, every subsequent command up to and including + * the result to the Sync message sent by PQpipelineSync gets set to + * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without + * error, a PGresult with PGRES_PIPELINE_SYNC is produced. + * + * Queries can already have been sent before PQpipelineSync is called, but + * PQpipelineSync need to be called before retrieving command results. + * + * The connection will remain in pipeline mode and unavailable for new + * synchronous command execution functions until all results from the pipeline + * are processed by the client. + */ +int +PQpipelineSync(PGconn *conn) +{ + PGcmdQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot send pipeline when not in pipeline mode\n")); + return 0; + } + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: cannot send pipeline while in COPY\n"); + return 0; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_IDLE: + /* OK to send sync */ + break; + } + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + pqAppendCmdQueueEntry(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + /* + * Call pqPipelineProcessQueue so the user can call start calling + * PQgetResult. + */ + pqPipelineProcessQueue(conn); + + return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + /* error message should be set up already */ + return 0; +} + /* ====== accessor funcs for PGresult ======== */ @@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res) char * PQresStatus(ExecStatusType status) { - if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0]) + if ((unsigned int) status >= lengthof(pgresStatus)) return libpq_gettext("invalid ExecStatusType code"); return pgresStatus[status]; } @@ -3152,6 +3690,23 @@ PQflush(PGconn *conn) return pqFlush(conn); } +/* + * pqPipelineFlush + * + * In pipeline mode, data will be flushed only when the out buffer reaches the + * threshold value. In non-pipeline mode, it behaves as stock pqFlush. + * + * Returns 0 on success. + */ +static int +pqPipelineFlush(PGconn *conn) +{ + if ((conn->pipelineStatus != PQ_PIPELINE_ON) || + (conn->outCount >= OUTBUFFER_THRESHOLD)) + return pqFlush(conn); + return 0; +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index eb55d528fb..306e89acfd 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -158,6 +158,18 @@ pqParseInput3(PGconn *conn) if (conn->asyncStatus != PGASYNC_IDLE) return; + /* + * We're also notionally not-IDLE when in pipeline mode the state + * says "idle" (so we have completed receiving the results of one + * query from the server and dispatched them to the application) + * but another query is queued; yield back control to caller so + * that they can initiate processing of the next query in the + * queue. + */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF && + conn->cmd_queue_head != NULL) + return; + /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; @@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn) } else { + /* Any other case is unexpected and we summarily skip it */ pqInternalNotice(&conn->noticeHooks, "message type 0x%02x arrived from server while idle", id); @@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_PIPELINE_SYNC); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + } + else + { + conn->pipelineStatus = PQ_PIPELINE_ON; + conn->asyncStatus = PGASYNC_READY; + } + } + else + { + /* + * In simple query protocol, advance the command queue + * (see PQgetResult). + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE) + pqCommandQueueAdvance(conn); + conn->asyncStatus = PGASYNC_IDLE; + } break; case 'I': /* empty query */ if (conn->result == NULL) @@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn) break; case '1': /* Parse Complete */ /* If we're doing PQprepare, we're done; else ignore */ - if (conn->queryclass == PGQUERY_PREPARE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_PREPARE) { if (conn->result == NULL) { @@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn) conn->inCursor += msgLength; } else if (conn->result == NULL || - conn->queryclass == PGQUERY_DESCRIBE) + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { /* First 'T' in a query sequence */ if (getRowDescriptions(conn, msgLength)) @@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn) * instead of PGRES_TUPLES_OK. Otherwise we can just * ignore this message. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE) { if (conn->result == NULL) { @@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) id, msgLength); /* build an error result holding the error message */ pqSaveErrorResult(conn); - conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ + conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */ /* flush input data since we're giving up on processing it */ pqDropConnection(conn, true); conn->status = CONNECTION_BAD; /* No more connection to backend */ @@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * PGresult created by getParamDescriptions, and we should fill data into * that. Otherwise, create a new, empty PGresult. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (!conn->cmd_queue_head || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { if (conn->result) result = conn->result; @@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * If we're doing a Describe, we're done, and ready to pass the result * back to the client. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if ((!conn->cmd_queue_head) || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { conn->asyncStatus = PGASYNC_READY; return 0; @@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + /* If in pipeline mode, set error indicator for it */ + if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF) + conn->pipelineStatus = PQ_PIPELINE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and @@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError) * might need it for an error cursor display, which is only true if there * is a PG_DIAG_STATEMENT_POSITION field. */ - if (have_position && conn->last_query && res) - res->errQuery = pqResultStrdup(res, conn->last_query); + if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query) + res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query); /* * Now build the "overall" error message for PQresultErrorMessage. @@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid, int avail; int i; + /* already validated by PQfn */ + Assert(conn->pipelineStatus == PQ_PIPELINE_OFF); + /* PQfn already validated connection state */ if (pqPutMsgStart('F', conn) < 0 || /* function call msg */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index fa9b62a844..cee42d4843 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -96,7 +96,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_PIPELINE_SYNC, /* pipeline synchronization point */ + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort + * earlier in a pipeline */ } ExecStatusType; typedef enum @@ -136,6 +139,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PGpipelineStatus - Current status of pipeline mode + */ +typedef enum +{ + PQ_PIPELINE_OFF, + PQ_PIPELINE_ON, + PQ_PIPELINE_ABORTED +} PGpipelineStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -327,6 +340,7 @@ extern int PQserverVersion(const PGconn *conn); extern char *PQerrorMessage(const PGconn *conn); extern int PQsocket(const PGconn *conn); extern int PQbackendPID(const PGconn *conn); +extern PGpipelineStatus PQpipelineStatus(const PGconn *conn); extern int PQconnectionNeedsPassword(const PGconn *conn); extern int PQconnectionUsedPassword(const PGconn *conn); extern int PQclientEncoding(const PGconn *conn); @@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for pipeline mode management */ +extern int PQenterPipelineMode(PGconn *conn); +extern int PQexitPipelineMode(PGconn *conn); +extern int PQpipelineSync(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 2f052f61f8..6374ec657a 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -217,21 +217,16 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, more results expected from this + * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ } PGAsyncStatusType; -/* PGQueryClass tracks which query protocol we are now executing */ -typedef enum -{ - PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ - PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ - PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ -} PGQueryClass; - /* Target server type (decoded value of target_session_attrs) */ typedef enum { @@ -305,6 +300,29 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* + * PGQueryClass tracks which query protocol is in use for each command queue + * entry, or special operation in execution + */ +typedef enum +{ + PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ + PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ + PGQUERY_PREPARE, /* Parse only (PQprepare) */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* Sync (at end of a pipeline) */ +} PGQueryClass; + +/* + * An entry in the pending command queue. + */ +typedef struct PGcmdQueueEntry +{ + PGQueryClass queryclass; /* Query type */ + char *query; /* SQL command, or NULL if none/unknown/OOM */ + struct PGcmdQueueEntry *next; /* list link */ +} PGcmdQueueEntry; + /* * pg_conn_host stores all information about each of possibly several hosts * mentioned in the connection string. Most fields are derived by splitting @@ -389,12 +407,11 @@ struct pg_conn ConnStatusType status; PGAsyncStatusType asyncStatus; PGTransactionStatusType xactStatus; /* never changes to ACTIVE */ - PGQueryClass queryclass; - char *last_query; /* last SQL command, or NULL if unknown */ char last_sqlstate[6]; /* last reported SQLSTATE */ bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PGpipelineStatus pipelineStatus; /* status of pipeline mode */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ @@ -407,6 +424,19 @@ struct pg_conn pg_conn_host *connhost; /* details about each named host */ char *connip; /* IP address for current network connection */ + /* + * The pending command queue as a singly-linked list. Head is the command + * currently in execution, tail is where new commands are added. + */ + PGcmdQueueEntry *cmd_queue_head; + PGcmdQueueEntry *cmd_queue_tail; + + /* + * To save malloc traffic, we don't free entries right away; instead we + * save them in this list for possible reuse. + */ + PGcmdQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code, extern void pqSaveParameterStatus(PGconn *conn, const char *name, const char *value); extern int pqRowProcessor(PGconn *conn, const char **errmsgp); +extern void pqCommandQueueAdvance(PGconn *conn); extern int PQsendQueryContinue(PGconn *conn, const char *query); /* === in fe-protocol3.c === */ @@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len); */ #define pqIsnonblocking(conn) ((conn)->nonblocking) +/* + * Connection's outbuffer threshold, for pipeline mode. + */ +#define OUTBUFFER_THRESHOLD 65536 + #ifdef ENABLE_NLS extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1); extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 5391f461a2..93e7829c67 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -10,6 +10,7 @@ SUBDIRS = \ delay_execution \ dummy_index_am \ dummy_seclabel \ + libpq_pipeline \ plsample \ snapshot_too_old \ test_bloomfilter \ diff --git a/src/test/modules/libpq_pipeline/.gitignore b/src/test/modules/libpq_pipeline/.gitignore new file mode 100644 index 0000000000..3a11e786b8 --- /dev/null +++ b/src/test/modules/libpq_pipeline/.gitignore @@ -0,0 +1,5 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ +/libpq_pipeline diff --git a/src/test/modules/libpq_pipeline/Makefile b/src/test/modules/libpq_pipeline/Makefile new file mode 100644 index 0000000000..b798f5fbbc --- /dev/null +++ b/src/test/modules/libpq_pipeline/Makefile @@ -0,0 +1,20 @@ +# src/test/modules/libpq_pipeline/Makefile + +PROGRAM = libpq_pipeline +OBJS = libpq_pipeline.o + +PG_CPPFLAGS = -I$(libpq_srcdir) +PG_LIBS_INTERNAL += $(libpq_pgport) + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/libpq_pipeline +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README new file mode 100644 index 0000000000..d8174dd579 --- /dev/null +++ b/src/test/modules/libpq_pipeline/README @@ -0,0 +1 @@ +Test programs and libraries for libpq diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c new file mode 100644 index 0000000000..03eb3df504 --- /dev/null +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -0,0 +1,1303 @@ +/*------------------------------------------------------------------------- + * + * libpq_pipeline.c + * Verify libpq pipeline execution functionality + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/test/modules/libpq_pipeline/libpq_pipeline.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include +#ifdef HAVE_SYS_SELECT_H +#include +#endif + +#include "catalog/pg_type_d.h" +#include "common/fe_memutils.h" +#include "libpq-fe.h" +#include "portability/instr_time.h" + + +static void exit_nicely(PGconn *conn); + +const char *const progname = "libpq_pipeline"; + + +#define DEBUG +#ifdef DEBUG +#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0) +#else +#define pg_debug(...) +#endif + +static const char *const drop_table_sql = +"DROP TABLE IF EXISTS pq_pipeline_demo"; +static const char *const create_table_sql = +"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);"; +static const char *const insert_sql = +"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);"; + +/* max char length of an int32, plus sign and null terminator */ +#define MAXINTLEN 12 + +static void +exit_nicely(PGconn *conn) +{ + PQfinish(conn); + exit(1); +} + +/* + * Print an error to stderr and terminate the program. + */ +#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__) +static void +pg_fatal_impl(int line, const char *fmt,...) +{ + va_list args; + + + fflush(stdout); + + fprintf(stderr, "\n%s:%d: ", progname, line); + va_start(args, fmt); + vfprintf(stderr, fmt, args); + va_end(args); + Assert(fmt[strlen(fmt) - 1] != '\n'); + fprintf(stderr, "\n"); + exit(1); +} + +static void +test_disallowed_in_pipeline(PGconn *conn) +{ + PGresult *res = NULL; + + fprintf(stderr, "test error cases... "); + + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode"); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("Unable to enter pipeline mode"); + + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Pipeline mode not activated properly"); + + /* PQexec should fail in pipeline mode */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("PQexec should fail in pipeline mode but succeeded"); + + /* Entering pipeline mode when already in pipeline mode is OK */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("re-entering pipeline mode should be a no-op but failed"); + + if (PQisBusy(conn) != 0) + pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1"); + + /* ok, back to normal command mode */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("couldn't exit idle empty pipeline mode"); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("Pipeline mode not terminated properly"); + + /* exiting pipeline mode when not in pipeline mode should be a no-op */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed"); + + /* can now PQexec again */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s", + PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + +static void +test_multi_pipelines(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "multi pipeline... "); + + /* + * Queue up a couple of small pipelines and process each without returning + * to command mode first. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn)); + + if (PQpipelineSync(conn) != 1) + pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn)); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + /* OK, start processing the results */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result"); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of sync result, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + + /* second pipeline */ + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from second pipeline item", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s from second pipeline sync", + PQresStatus(PQresultStatus(res))); + + /* We're still in pipeline mode ... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow"); + + /* until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("exiting pipeline mode didn't seem to work"); + + fprintf(stderr, "ok\n"); +} + +/* + * When an operation in a pipeline fails the rest of the pipeline is flushed. We + * still have to get results for each pipeline item, but the item will just be + * a PGRES_PIPELINE_ABORTED code. + * + * This intentionally doesn't use a transaction to wrap the pipeline. You should + * usually use an xact, but in this case we want to observe the effects of each + * statement. + */ +static void +test_pipeline_abort(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + int i; + bool goterror; + + fprintf(stderr, "aborted pipeline... "); + + res = PQexec(conn, drop_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn)); + + res = PQexec(conn, create_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn)); + + /* + * Queue up a couple of small pipelines and process each without returning + * to command mode first. Make sure the second operation in the first + * pipeline ERRORs. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + dummy_params[0] = "1"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT no_such_function($1)", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn)); + + dummy_params[0] = "2"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn)); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + dummy_params[0] = "3"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second-pipeline insert failed: %s", + PQerrorMessage(conn)); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + /* + * OK, start processing the pipeline results. + * + * We should get a command-ok for the first query, then a fatal error and + * a pipeline aborted message for the second insert, a pipeline-end, then + * a command-ok and a pipeline-ok for the second pipeline operation. + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result status %s: %s", + PQresStatus(PQresultStatus(res)), + PQresultErrorMessage(res)); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s", + PQresStatus(PQresultStatus(res))); + + /* Second query caused error, so we expect an error next */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s", + PQresStatus(PQresultStatus(res))); + + /* + * pipeline should now be aborted. + * + * Note that we could still queue more queries at this point if we wanted; + * they'd get added to a new third pipeline since we've already sent a + * second. The aborted flag relates only to the pipeline being received. + */ + if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED) + pg_fatal("pipeline should be flagged as aborted but isn't"); + + /* third query in pipeline, the second insert */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED) + pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res))); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED) + pg_fatal("pipeline should be flagged as aborted but isn't"); + + /* Ensure we're still in pipeline */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow"); + + /* + * The end of a failed pipeline is a PGRES_PIPELINE_SYNC. + * + * (This is so clients know to start processing results normally again and + * can tell the difference between skipped commands and the sync.) + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code from first pipeline sync\n" + "Expected PGRES_PIPELINE_SYNC, got %s", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED) + pg_fatal("sync should've cleared the aborted flag but didn't"); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow"); + + /* the insert from the second pipeline */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result code %s from first item in second pipeline", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* Read the NULL result at the end of the command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res))); + + /* the second pipeline sync */ + if ((res = PQgetResult(conn)) == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s from second pipeline sync", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s: %s", + PQresStatus(PQresultStatus(res)), + PQerrorMessage(conn)); + + /* Try to send two queries in one command */ + if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + goterror = false; + while ((res = PQgetResult(conn)) != NULL) + { + switch (PQresultStatus(res)) + { + case PGRES_FATAL_ERROR: + if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0) + pg_fatal("expected error about multiple commands, got %s", + PQerrorMessage(conn)); + printf("got expected %s", PQerrorMessage(conn)); + goterror = true; + break; + default: + pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res))); + break; + } + } + if (!goterror) + pg_fatal("did not get cannot-insert-multiple-commands error"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("got NULL result"); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s from pipeline sync", + PQresStatus(PQresultStatus(res))); + + /* Test single-row mode with an error partways */ + if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + PQsetSingleRowMode(conn); + goterror = false; + while ((res = PQgetResult(conn)) != NULL) + { + switch (PQresultStatus(res)) + { + case PGRES_SINGLE_TUPLE: + printf("got row: %s\n", PQgetvalue(res, 0, 0)); + break; + case PGRES_FATAL_ERROR: + if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0) + pg_fatal("expected division-by-zero, got: %s (%s)", + PQerrorMessage(conn), + PQresultErrorField(res, PG_DIAG_SQLSTATE)); + printf("got expected division-by-zero\n"); + goterror = true; + break; + default: + pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res))); + } + PQclear(res); + } + if (!goterror) + pg_fatal("did not get division-by-zero error"); + /* the third pipeline sync */ + if ((res = PQgetResult(conn)) == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s from third pipeline sync", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow"); + + /* until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("exiting pipeline mode didn't seem to work"); + + fprintf(stderr, "ok\n"); + + /*- + * Since we fired the pipelines off without a surrounding xact, the results + * should be: + * + * - Implicit xact started by server around 1st pipeline + * - First insert applied + * - Second statement aborted xact + * - Third insert skipped + * - Sync rolled back first implicit xact + * - Implicit xact created by server around 2nd pipeline + * - insert applied from 2nd pipeline + * - Sync commits 2nd xact + * + * So we should only have the value 3 that we inserted. + */ + res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Expected tuples, got %s: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + if (PQntuples(res) != 1) + pg_fatal("expected 1 result, got %d", PQntuples(res)); + for (i = 0; i < PQntuples(res); i++) + { + const char *val = PQgetvalue(res, i, 0); + + if (strcmp(val, "3") != 0) + pg_fatal("expected only insert with value 3, got %s", val); + } + + PQclear(res); +} + +/* State machine enum for test_pipelined_insert */ +enum PipelineInsertStep +{ + BI_BEGIN_TX, + BI_DROP_TABLE, + BI_CREATE_TABLE, + BI_PREPARE, + BI_INSERT_ROWS, + BI_COMMIT_TX, + BI_SYNC, + BI_DONE +}; + +static void +test_pipelined_insert(PGconn *conn, int n_rows) +{ + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + enum PipelineInsertStep send_step = BI_BEGIN_TX, + recv_step = BI_BEGIN_TX; + int rows_to_send, + rows_to_receive; + + insert_params[0] = &insert_param_0[0]; + + rows_to_send = rows_to_receive = n_rows; + + /* + * Do a pipelined insert into a table created at the start of the pipeline + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + while (send_step != BI_PREPARE) + { + const char *sql; + + switch (send_step) + { + case BI_BEGIN_TX: + sql = "BEGIN TRANSACTION"; + send_step = BI_DROP_TABLE; + break; + + case BI_DROP_TABLE: + sql = drop_table_sql; + send_step = BI_CREATE_TABLE; + break; + + case BI_CREATE_TABLE: + sql = create_table_sql; + send_step = BI_PREPARE; + break; + + default: + pg_fatal("invalid state"); + } + + pg_debug("sending: %s\n", sql); + if (PQsendQueryParams(conn, sql, + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn)); + } + + Assert(send_step == BI_PREPARE); + pg_debug("sending: %s\n", insert_sql); + if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1) + pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn)); + send_step = BI_INSERT_ROWS; + + /* + * Now we start inserting. We'll be sending enough data that we could fill + * our output buffer, so to avoid deadlocking we need to enter nonblocking + * mode and consume input while we send more output. As results of each + * query are processed we should pop them to allow processing of the next + * query. There's no need to finish the pipeline before processing + * results. + */ + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn)); + + while (recv_step != BI_DONE) + { + int sock; + fd_set input_mask; + fd_set output_mask; + + sock = PQsocket(conn); + + if (sock < 0) + break; /* shouldn't happen */ + + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + FD_ZERO(&output_mask); + FD_SET(sock, &output_mask); + + if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + + /* + * Process any results, so we keep the server's output buffer free + * flowing and it can continue to process input + */ + if (FD_ISSET(sock, &input_mask)) + { + PQconsumeInput(conn); + + /* Read until we'd block if we tried to read */ + while (!PQisBusy(conn) && recv_step < BI_DONE) + { + PGresult *res; + const char *cmdtag; + const char *description = ""; + int status; + + /* + * Read next result. If no more results from this query, + * advance to the next query + */ + res = PQgetResult(conn); + if (res == NULL) + continue; + + status = PGRES_COMMAND_OK; + switch (recv_step) + { + case BI_BEGIN_TX: + cmdtag = "BEGIN"; + recv_step++; + break; + case BI_DROP_TABLE: + cmdtag = "DROP TABLE"; + recv_step++; + break; + case BI_CREATE_TABLE: + cmdtag = "CREATE TABLE"; + recv_step++; + break; + case BI_PREPARE: + cmdtag = ""; + description = "PREPARE"; + recv_step++; + break; + case BI_INSERT_ROWS: + cmdtag = "INSERT"; + rows_to_receive--; + if (rows_to_receive == 0) + recv_step++; + break; + case BI_COMMIT_TX: + cmdtag = "COMMIT"; + recv_step++; + break; + case BI_SYNC: + cmdtag = ""; + description = "SYNC"; + status = PGRES_PIPELINE_SYNC; + recv_step++; + break; + case BI_DONE: + /* unreachable */ + description = ""; + abort(); + } + + if (PQresultStatus(res) != status) + pg_fatal("%s reported status %s, expected %s\n" + "Error message: \"%s\"", + description, PQresStatus(PQresultStatus(res)), + PQresStatus(status), PQerrorMessage(conn)); + + if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0) + pg_fatal("%s expected command tag '%s', got '%s'", + description, cmdtag, PQcmdStatus(res)); + + pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description); + + PQclear(res); + } + } + + /* Write more rows and/or the end pipeline message, if needed */ + if (FD_ISSET(sock, &output_mask)) + { + PQflush(conn); + + if (send_step == BI_INSERT_ROWS) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send); + + if (PQsendQueryPrepared(conn, "my_insert", + 1, insert_params, NULL, NULL, 0) == 1) + { + pg_debug("sent row %d\n", rows_to_send); + + rows_to_send--; + if (rows_to_send == 0) + send_step++; + } + else + { + /* + * in nonblocking mode, so it's OK for an insert to fail + * to send + */ + fprintf(stderr, "WARNING: failed to send insert #%d: %s\n", + rows_to_send, PQerrorMessage(conn)); + } + } + else if (send_step == BI_COMMIT_TX) + { + if (PQsendQueryParams(conn, "COMMIT", + 0, NULL, NULL, NULL, NULL, 0) == 1) + { + pg_debug("sent COMMIT\n"); + send_step++; + } + else + { + fprintf(stderr, "WARNING: failed to send commit: %s\n", + PQerrorMessage(conn)); + } + } + else if (send_step == BI_SYNC) + { + if (PQpipelineSync(conn) == 1) + { + fprintf(stdout, "pipeline sync sent\n"); + send_step++; + } + else + { + fprintf(stderr, "WARNING: pipeline sync failed: %s\n", + PQerrorMessage(conn)); + } + } + } + } + + /* We've got the sync message and the pipeline should be done */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + if (PQsetnonblocking(conn, 0) != 0) + pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + +static void +test_prepared(PGconn *conn) +{ + PGresult *res = NULL; + Oid param_oids[1] = {INT4OID}; + Oid expected_oids[4]; + Oid typ; + + fprintf(stderr, "prepared... "); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, " + "interval '1 sec'", + 1, param_oids) != 1) + pg_fatal("preparing query failed: %s", PQerrorMessage(conn)); + expected_oids[0] = INT4OID; + expected_oids[1] = TEXTOID; + expected_oids[2] = NUMERICOID; + expected_oids[3] = INTERVALOID; + if (PQsendDescribePrepared(conn, "select_one") != 1) + pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned NULL"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + if (PQnfields(res) != lengthof(expected_oids)) + pg_fatal("expected %d columns, got %d", + lengthof(expected_oids), PQnfields(res)); + for (int i = 0; i < PQnfields(res); i++) + { + typ = PQftype(res, i); + if (typ != expected_oids[i]) + pg_fatal("field %d: expected type %u, got %u", + i, expected_oids[i], typ); + } + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn)); + + PQexec(conn, "BEGIN"); + PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1"); + PQenterPipelineMode(conn); + if (PQsendDescribePortal(conn, "cursor_one") != 1) + pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("expected NULL result"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + + typ = PQftype(res, 0); + if (typ != INT4OID) + pg_fatal("portal: expected type %u, got %u", + INT4OID, typ); + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + +static void +test_simple_pipeline(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "simple pipeline... "); + + /* + * Enter pipeline mode and dispatch a set of operations, which we'll then + * process the results of as they come in. + * + * For a simple case we should be able to do this without interim + * processing of results since our output buffer will give us enough slush + * to work with and we won't block on sending. So blocking mode is fine. + */ + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode"); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn)); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded"); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first query result."); + + /* + * Even though we've processed the result there's still a sync to come and + * we can't exit pipeline mode yet + */ + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after pipeline end: %s", + PQresStatus(PQresultStatus(res))); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow"); + + /* ... until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("Exiting pipeline mode didn't seem to work"); + + fprintf(stderr, "ok\n"); +} + +static void +test_singlerowmode(PGconn *conn) +{ + PGresult *res; + int i; + bool pipeline_ended = false; + + /* 1 pipeline, 3 queries in it */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", + PQerrorMessage(conn)); + + for (i = 0; i < 3; i++) + { + char *param[1]; + + param[0] = psprintf("%d", 44 + i); + + if (PQsendQueryParams(conn, + "SELECT generate_series(42, $1)", + 1, + NULL, + (const char **) param, + NULL, + NULL, + 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + pfree(param[0]); + } + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + for (i = 0; !pipeline_ended; i++) + { + bool first = true; + bool saw_ending_tuplesok; + bool isSingleTuple = false; + + /* Set single row mode for only first 2 SELECT queries */ + if (i < 2) + { + if (PQsetSingleRowMode(conn) != 1) + pg_fatal("PQsetSingleRowMode() failed for i=%d", i); + } + + /* Consume rows for this query */ + saw_ending_tuplesok = false; + while ((res = PQgetResult(conn)) != NULL) + { + ExecStatusType est = PQresultStatus(res); + + if (est == PGRES_PIPELINE_SYNC) + { + fprintf(stderr, "end of pipeline reached\n"); + pipeline_ended = true; + PQclear(res); + if (i != 3) + pg_fatal("Expected three results, got %d", i); + break; + } + + /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */ + if (first) + { + if (i <= 1 && est != PGRES_SINGLE_TUPLE) + pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s", + i, PQresStatus(est)); + if (i >= 2 && est != PGRES_TUPLES_OK) + pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s", + i, PQresStatus(est)); + first = false; + } + + fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i); + switch (est) + { + case PGRES_TUPLES_OK: + fprintf(stderr, ", tuples: %d\n", PQntuples(res)); + saw_ending_tuplesok = true; + if (isSingleTuple) + { + if (PQntuples(res) == 0) + fprintf(stderr, "all tuples received in query %d\n", i); + else + pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead"); + } + break; + + case PGRES_SINGLE_TUPLE: + isSingleTuple = true; + fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0)); + break; + + default: + pg_fatal("unexpected"); + } + PQclear(res); + } + if (!pipeline_ended && !saw_ending_tuplesok) + pg_fatal("didn't get expected terminating TUPLES_OK"); + } + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn)); +} + +/* + * Simple test to verify that a pipeline is discarded as a whole when there's + * an error, ignoring transaction commands. + */ +static void +test_transaction(PGconn *conn) +{ + PGresult *res; + bool expect_null; + int num_syncs = 0; + + res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;" + "CREATE TABLE pq_pipeline_tst (id int)"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("failed to create test table: %s", + PQerrorMessage(conn)); + PQclear(res); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", + PQerrorMessage(conn)); + if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1) + pg_fatal("could not send prepare on pipeline: %s", + PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, + "BEGIN", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + if (PQsendQueryParams(conn, + "SELECT 0/0", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + + /* + * send a ROLLBACK using a prepared stmt. Doesn't work because we need to + * get out of the pipeline-aborted state first. + */ + if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1) + pg_fatal("failed to execute prepared: %s", + PQerrorMessage(conn)); + + /* This insert fails because we're in pipeline-aborted state */ + if (PQsendQueryParams(conn, + "INSERT INTO pq_pipeline_tst VALUES (1)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + num_syncs++; + + /* + * This insert fails even though the pipeline got a SYNC, because we're in + * an aborted transaction + */ + if (PQsendQueryParams(conn, + "INSERT INTO pq_pipeline_tst VALUES (2)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + num_syncs++; + + /* + * Send ROLLBACK using prepared stmt. This one works because we just did + * PQpipelineSync above. + */ + if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1) + pg_fatal("failed to execute prepared: %s", + PQerrorMessage(conn)); + + /* + * Now that we're out of a transaction and in pipeline-good mode, this + * insert works + */ + if (PQsendQueryParams(conn, + "INSERT INTO pq_pipeline_tst VALUES (3)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + /* Send two syncs now -- match up to SYNC messages below */ + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + num_syncs++; + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + num_syncs++; + + expect_null = false; + for (int i = 0;; i++) + { + ExecStatusType restype; + + res = PQgetResult(conn); + if (res == NULL) + { + printf("%d: got NULL result\n", i); + if (!expect_null) + pg_fatal("did not expect NULL here"); + expect_null = false; + continue; + } + restype = PQresultStatus(res); + printf("%d: got status %s", i, PQresStatus(restype)); + if (expect_null) + pg_fatal("expected NULL"); + if (restype == PGRES_FATAL_ERROR) + printf("; error: %s", PQerrorMessage(conn)); + else if (restype == PGRES_PIPELINE_ABORTED) + { + printf(": command didn't run because pipeline aborted\n"); + } + else + printf("\n"); + PQclear(res); + + if (restype == PGRES_PIPELINE_SYNC) + num_syncs--; + else + expect_null = true; + if (num_syncs <= 0) + break; + } + if (PQgetResult(conn) != NULL) + pg_fatal("returned something extra after all the syncs: %s", + PQresStatus(PQresultStatus(res))); + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn)); + + /* We expect to find one tuple containing the value "3" */ + res = PQexec(conn, "SELECT * FROM pq_pipeline_tst"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("failed to obtain result: %s", PQerrorMessage(conn)); + if (PQntuples(res) != 1) + pg_fatal("did not get 1 tuple"); + if (strcmp(PQgetvalue(res, 0, 0), "3") != 0) + pg_fatal("did not get expected tuple"); + PQclear(res); + + fprintf(stderr, "ok\n"); +} + +static void +usage(const char *progname) +{ + fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname); + fprintf(stderr, "Usage:\n"); + fprintf(stderr, " %s tests", progname); + fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname); +} + +static void +print_test_list(void) +{ + printf("disallowed_in_pipeline\n"); + printf("multi_pipelines\n"); + printf("pipeline_abort\n"); + printf("pipelined_insert\n"); + printf("prepared\n"); + printf("simple_pipeline\n"); + printf("singlerow\n"); + printf("transaction\n"); +} + +int +main(int argc, char **argv) +{ + const char *conninfo = ""; + PGconn *conn; + int numrows = 10000; + PGresult *res; + + if (strcmp(argv[1], "tests") == 0) + { + print_test_list(); + exit(0); + } + + /* + * The testname parameter is mandatory; it can be followed by a conninfo + * string and number of rows. + */ + if (argc < 2 || argc > 4) + { + usage(argv[0]); + exit(1); + } + + if (argc >= 3) + conninfo = pg_strdup(argv[2]); + + if (argc >= 4) + { + errno = 0; + numrows = strtol(argv[3], NULL, 10); + if (errno != 0 || numrows <= 0) + { + fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]); + exit(1); + } + } + + /* Make a connection to the database */ + conn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection to database failed: %s\n", + PQerrorMessage(conn)); + exit_nicely(conn); + } + res = PQexec(conn, "SET lc_messages TO \"C\""); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn)); + + if (strcmp(argv[1], "disallowed_in_pipeline") == 0) + test_disallowed_in_pipeline(conn); + else if (strcmp(argv[1], "multi_pipelines") == 0) + test_multi_pipelines(conn); + else if (strcmp(argv[1], "pipeline_abort") == 0) + test_pipeline_abort(conn); + else if (strcmp(argv[1], "pipelined_insert") == 0) + test_pipelined_insert(conn, numrows); + else if (strcmp(argv[1], "prepared") == 0) + test_prepared(conn); + else if (strcmp(argv[1], "simple_pipeline") == 0) + test_simple_pipeline(conn); + else if (strcmp(argv[1], "singlerow") == 0) + test_singlerowmode(conn); + else if (strcmp(argv[1], "transaction") == 0) + test_transaction(conn); + else + { + fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]); + usage(argv[0]); + exit(1); + } + + /* close the connection to the database and cleanup */ + PQfinish(conn); + return 0; +} diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl new file mode 100644 index 0000000000..ba15b64ca7 --- /dev/null +++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl @@ -0,0 +1,28 @@ +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; +use Test::More tests => 8; +use Cwd; + +my $node = get_new_node('main'); +$node->init; +$node->start; + +my $numrows = 10000; +$ENV{PATH} = "$ENV{PATH}:" . getcwd(); + +my ($out, $err) = run_command(['libpq_pipeline', 'tests']); +die "oops: $err" unless $err eq ''; +my @tests = split(/\s/, $out); + +for my $testname (@tests) +{ + $node->command_ok( + [ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ], + "libpq_pipeline $testname"); +} + +$node->stop('fast'); diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 74fde40e3a..a184404e21 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -33,10 +33,11 @@ my @unlink_on_exit; # Set of variables for modules in contrib/ and src/test/modules/ my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' }; -my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo'); -my @contrib_uselibpgport = ('oid2name', 'vacuumlo'); -my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo'); -my $contrib_extralibs = undef; +my @contrib_uselibpq = + ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline'); +my @contrib_uselibpgport = ('libpq_pipeline', 'oid2name', 'vacuumlo'); +my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo'); +my $contrib_extralibs = { 'libpq_pipeline' => ['ws2_32.lib'] }; my $contrib_extraincludes = { 'dblink' => ['src/backend'] }; my $contrib_extrasource = { 'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ], diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 61cf4eae1f..9e6777e9d0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1563,10 +1563,12 @@ PG_Locale_Strategy PG_Lock_Status PG_init_t PGcancel +PGcmdQueueEntry PGconn PGdataValue PGlobjfuncs PGnotify +PGpipelineStatus PGresAttDesc PGresAttValue PGresParamDesc