diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index e69feacfe6..aadd5d2581 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3588,6 +3588,20 @@ ExecStatusType PQresultStatus(const PGresult *res); + + PGRES_TUPLES_CHUNK + + + The PGresult contains several result tuples + from the current command. This status occurs only when + chunked mode has been selected for the query + (see ). + The number of tuples will not exceed the limit passed to + . + + + + PGRES_PIPELINE_SYNC @@ -3617,8 +3631,9 @@ ExecStatusType PQresultStatus(const PGresult *res); - If the result status is PGRES_TUPLES_OK or - PGRES_SINGLE_TUPLE, then + If the result status is PGRES_TUPLES_OK, + PGRES_SINGLE_TUPLE, or + PGRES_TUPLES_CHUNK, then the functions described below can be used to retrieve the rows returned by the query. Note that a SELECT command that happens to retrieve zero rows still shows @@ -4030,7 +4045,9 @@ void PQclear(PGresult *res); These functions are used to extract information from a PGresult object that represents a successful query result (that is, one that has status - PGRES_TUPLES_OK or PGRES_SINGLE_TUPLE). + PGRES_TUPLES_OK, + PGRES_SINGLE_TUPLE, or + PGRES_TUPLES_CHUNK). They can also be used to extract information from a successful Describe operation: a Describe's result has all the same column information that actual execution of the query @@ -5235,7 +5252,8 @@ PGresult *PQgetResult(PGconn *conn); Another frequently-desired feature that can be obtained with and - is retrieving large query results a row at a time. This is discussed + is retrieving large query results a limited number of rows at a time. + This is discussed in . @@ -5599,15 +5617,6 @@ int PQflush(PGconn *conn); queries in the pipeline; see . - - To enter single-row mode, call PQsetSingleRowMode - before retrieving results with PQgetResult. - This mode selection is effective only for the query currently - being processed. For more information on the use of - PQsetSingleRowMode, - refer to . - - PQgetResult behaves the same as for normal asynchronous processing except that it may contain the new @@ -5972,36 +5981,49 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; + - Retrieving Query Results Row-by-Row + Retrieving Query Results in Chunks libpq single-row mode + + libpq + chunked mode + + Ordinarily, libpq collects an SQL command's entire result and returns it to the application as a single PGresult. This can be unworkable for commands that return a large number of rows. For such cases, applications can use and in - single-row mode. In this mode, the result row(s) are - returned to the application one at a time, as they are received from the - server. + single-row mode or chunked + mode. In these modes, result row(s) are returned to the + application as they are received from the server, one at a time for + single-row mode or in groups for chunked mode. - To enter single-row mode, call + To enter one of these modes, call + or immediately after a successful call of (or a sibling function). This mode selection is effective only for the currently executing query. Then call repeatedly, until it returns null, as documented in . If the query returns any rows, they are returned - as individual PGresult objects, which look like + as one or more PGresult objects, which look like normal query results except for having status code - PGRES_SINGLE_TUPLE instead of - PGRES_TUPLES_OK. After the last row, or immediately if + PGRES_SINGLE_TUPLE for single-row mode or + PGRES_TUPLES_CHUNK for chunked mode, instead of + PGRES_TUPLES_OK. There is exactly one result row in + each PGRES_SINGLE_TUPLE object, while + a PGRES_TUPLES_CHUNK object contains at least one + row but not more than the specified number of rows per chunk. + After the last row, or immediately if the query returns zero rows, a zero-row object with status PGRES_TUPLES_OK is returned; this is the signal that no more rows will arrive. (But note that it is still necessary to continue @@ -6013,9 +6035,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; - When using pipeline mode, single-row mode needs to be activated for each - query in the pipeline before retrieving results for that query - with PQgetResult. + When using pipeline mode, single-row or chunked mode needs to be + activated for each query in the pipeline before retrieving results for + that query with PQgetResult. See for more information. @@ -6046,6 +6068,36 @@ int PQsetSingleRowMode(PGconn *conn); + + + PQsetChunkedRowsModePQsetChunkedRowsMode + + + + Select chunked mode for the currently-executing query. + + +int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); + + + + + This function is similar to + , except that it + specifies retrieval of up to chunkSize rows + per PGresult, not necessarily just one row. + This function can only be called immediately after + or one of its sibling functions, + before any other operation on the connection such as + or + . If called at the correct time, + the function activates chunked mode for the current query and + returns 1. Otherwise the mode stays unchanged and the function + returns 0. In any case, the mode reverts to normal after + completion of the current query. + + + @@ -6054,9 +6106,10 @@ int PQsetSingleRowMode(PGconn *conn); While processing a query, the server may return some rows and then encounter an error, causing the query to be aborted. Ordinarily, libpq discards any such rows and reports only the - error. But in single-row mode, those rows will have already been - returned to the application. Hence, the application will see some - PGRES_SINGLE_TUPLE PGresult + error. But in single-row or chunked mode, some rows may have already + been returned to the application. Hence, the application will see some + PGRES_SINGLE_TUPLE or PGRES_TUPLES_CHUNK + PGresult objects followed by a PGRES_FATAL_ERROR object. For proper transactional behavior, the application must be designed to discard or undo whatever has been done with the previously-processed diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 761bf0f677..3c2b1bb496 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1248,8 +1248,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, switch (PQresultStatus(pgres)) { - case PGRES_SINGLE_TUPLE: case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: walres->status = WALRCV_OK_TUPLES; libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes); break; diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index e5f9eedc47..7e3101704d 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -991,6 +991,7 @@ should_processing_continue(PGresult *res) case PGRES_SINGLE_TUPLE: case PGRES_PIPELINE_SYNC: case PGRES_PIPELINE_ABORTED: + case PGRES_TUPLES_CHUNK: return false; } return true; diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1e48d37677..8ee0811510 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -203,3 +203,4 @@ PQcancelErrorMessage 200 PQcancelReset 201 PQcancelFinish 202 PQsocketPoll 203 +PQsetChunkedRowsMode 204 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c02a9180b2..7bdfc4c21a 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" }; /* We return this if we're unable to make a PGresult at all */ @@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: /* non-error cases */ break; default: @@ -771,7 +773,7 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. * - * Any "next" result gets cleared too. + * Any "saved" result gets cleared too. */ void pqClearAsyncResult(PGconn *conn) @@ -779,8 +781,8 @@ pqClearAsyncResult(PGconn *conn) PQclear(conn->result); conn->result = NULL; conn->error_result = false; - PQclear(conn->next_result); - conn->next_result = NULL; + PQclear(conn->saved_result); + conn->saved_result = NULL; } /* @@ -911,14 +913,14 @@ pqPrepareAsyncResult(PGconn *conn) } /* - * Replace conn->result with next_result, if any. In the normal case - * there isn't a next result and we're just dropping ownership of the - * current result. In single-row mode this restores the situation to what - * it was before we created the current single-row result. + * Replace conn->result with saved_result, if any. In the normal case + * there isn't a saved result and we're just dropping ownership of the + * current result. In partial-result mode this restores the situation to + * what it was before we created the current partial result. */ - conn->result = conn->next_result; - conn->error_result = false; /* next_result is never an error */ - conn->next_result = NULL; + conn->result = conn->saved_result; + conn->error_result = false; /* saved_result is never an error */ + conn->saved_result = NULL; return res; } @@ -1199,11 +1201,6 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) * On error, *errmsgp can be set to an error string to be returned. * (Such a string should already be translated via libpq_gettext().) * If it is left NULL, the error is presumed to be "out of memory". - * - * In single-row mode, we create a new result holding just the current row, - * stashing the previous result in conn->next_result so that it becomes - * active again after pqPrepareAsyncResult(). This allows the result metadata - * (column descriptions) to be carried forward to each result row. */ int pqRowProcessor(PGconn *conn, const char **errmsgp) @@ -1215,11 +1212,14 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) int i; /* - * In single-row mode, make a new PGresult that will hold just this one - * row; the original conn->result is left unchanged so that it can be used - * again as the template for future rows. + * In partial-result mode, if we don't already have a partial PGresult + * then make one by cloning conn->result (which should hold the correct + * result metadata by now). Then the original conn->result is moved over + * to saved_result so that we can re-use it as a reference for future + * partial results. The saved result will become active again after + * pqPrepareAsyncResult() returns the partial result to the application. */ - if (conn->singleRowMode) + if (conn->partialResMode && conn->saved_result == NULL) { /* Copy everything that should be in the result at this point */ res = PQcopyResult(res, @@ -1227,6 +1227,11 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) PG_COPYRES_NOTICEHOOKS); if (!res) return 0; + /* Change result status to appropriate special value */ + res->resultStatus = (conn->singleRowMode ? PGRES_SINGLE_TUPLE : PGRES_TUPLES_CHUNK); + /* And stash it as the active result */ + conn->saved_result = conn->result; + conn->result = res; } /* @@ -1241,7 +1246,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) tup = (PGresAttValue *) pqResultAlloc(res, nfields * sizeof(PGresAttValue), true); if (tup == NULL) - goto fail; + return 0; for (i = 0; i < nfields; i++) { @@ -1260,7 +1265,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) val = (char *) pqResultAlloc(res, clen + 1, isbinary); if (val == NULL) - goto fail; + return 0; /* copy and zero-terminate the data (even if it's binary) */ memcpy(val, columns[i].value, clen); @@ -1273,30 +1278,16 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) /* And add the tuple to the PGresult's tuple array */ if (!pqAddTuple(res, tup, errmsgp)) - goto fail; + return 0; /* - * Success. In single-row mode, make the result available to the client - * immediately. + * Success. In partial-result mode, if we have enough rows then make the + * result available to the client immediately. */ - if (conn->singleRowMode) - { - /* Change result status to special single-row value */ - res->resultStatus = PGRES_SINGLE_TUPLE; - /* Stash old result for re-use later */ - conn->next_result = conn->result; - conn->result = res; - /* And mark the result ready to return */ + if (conn->partialResMode && res->ntups >= conn->maxChunkSize) conn->asyncStatus = PGASYNC_READY_MORE; - } return 1; - -fail: - /* release locally allocated PGresult, if we made one */ - if (res != conn->result) - PQclear(res); - return 0; } @@ -1745,8 +1736,10 @@ PQsendQueryStart(PGconn *conn, bool newQuery) */ pqClearAsyncResult(conn); - /* reset single-row processing mode */ + /* reset partial-result mode */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; } /* ready to send command message */ @@ -1925,30 +1918,61 @@ sendFailed: return 0; } +/* + * Is it OK to change partial-result mode now? + */ +static bool +canChangeResultMode(PGconn *conn) +{ + /* + * Only allow changing the mode when we have launched a query and not yet + * received any results. + */ + if (!conn) + return false; + if (conn->asyncStatus != PGASYNC_BUSY) + return false; + if (!conn->cmd_queue_head || + (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && + conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) + return false; + if (pgHavePendingResult(conn)) + return false; + return true; +} + /* * Select row-by-row processing mode */ int PQsetSingleRowMode(PGconn *conn) { - /* - * Only allow setting the flag when we have launched a query and not yet - * received any results. - */ - if (!conn) - return 0; - if (conn->asyncStatus != PGASYNC_BUSY) - return 0; - if (!conn->cmd_queue_head || - (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && - conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) - return 0; - if (pgHavePendingResult(conn)) + if (canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = true; + conn->maxChunkSize = 1; + return 1; + } + else return 0; +} - /* OK, set flag */ - conn->singleRowMode = true; - return 1; +/* + * Select chunked results processing mode + */ +int +PQsetChunkedRowsMode(PGconn *conn, int chunkSize) +{ + if (chunkSize > 0 && canChangeResultMode(conn)) + { + conn->partialResMode = true; + conn->singleRowMode = false; + conn->maxChunkSize = chunkSize; + return 1; + } + else + return 0; } /* @@ -2117,6 +2141,20 @@ PQgetResult(PGconn *conn) case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + /* + * Normally pqPrepareAsyncResult will have left conn->result + * empty. Otherwise, "res" must be a not-full PGRES_TUPLES_CHUNK + * result, which we want to return to the caller while staying in + * PGASYNC_READY state. Then the next call here will return the + * empty PGRES_TUPLES_OK result that was restored from + * saved_result, after which we can proceed. + */ + if (conn->result) + { + Assert(res->resultStatus == PGRES_TUPLES_CHUNK); + break; + } + /* Advance the queue as appropriate */ pqCommandQueueAdvance(conn, false, res->resultStatus == PGRES_PIPELINE_SYNC); @@ -3173,10 +3211,12 @@ pqPipelineProcessQueue(PGconn *conn) } /* - * Reset single-row processing mode. (Client has to set it up for each - * query, if desired.) + * Reset partial-result mode. (Client has to set it up for each query, if + * desired.) */ + conn->partialResMode = false; conn->singleRowMode = false; + conn->maxChunkSize = 0; /* * If there are no further commands to process in the queue, get us in diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 701d58e108..3170d484f0 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -379,7 +379,8 @@ pqParseInput3(PGconn *conn) break; case PqMsg_DataRow: if (conn->result != NULL && - conn->result->resultStatus == PGRES_TUPLES_OK) + (conn->result->resultStatus == PGRES_TUPLES_OK || + conn->result->resultStatus == PGRES_TUPLES_CHUNK)) { /* Read another tuple of a normal query response */ if (getAnotherTuple(conn, msgLength)) diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index c184e85388..c0443d68fd 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -112,8 +112,9 @@ typedef enum PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ PGRES_PIPELINE_SYNC, /* pipeline synchronization point */ - PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort * earlier in a pipeline */ + PGRES_TUPLES_CHUNK /* chunk of tuples from larger resultset */ } ExecStatusType; typedef enum @@ -489,6 +490,7 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramFormats, int resultFormat); extern int PQsetSingleRowMode(PGconn *conn); +extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); extern PGresult *PQgetResult(PGconn *conn); /* Routines for managing an asynchronous query */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9c05f11a6e..113ea47c40 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -434,7 +434,10 @@ struct pg_conn bool nonblocking; /* whether this connection is using nonblock * sending semantics */ PGpipelineStatus pipelineStatus; /* status of pipeline mode */ + bool partialResMode; /* true if single-row or chunked mode */ bool singleRowMode; /* return current query result row-by-row? */ + int maxChunkSize; /* return query result in chunks not exceeding + * this number of rows */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ PGnotify *notifyHead; /* oldest unreported Notify msg */ @@ -535,12 +538,13 @@ struct pg_conn * and error_result is true, then we need to return a PGRES_FATAL_ERROR * result, but haven't yet constructed it; text for the error has been * appended to conn->errorMessage. (Delaying construction simplifies - * dealing with out-of-memory cases.) If next_result isn't NULL, it is a - * PGresult that will replace "result" after we return that one. + * dealing with out-of-memory cases.) If saved_result isn't NULL, it is a + * PGresult that will replace "result" after we return that one; we use + * that in partial-result mode to remember the query's tuple metadata. */ PGresult *result; /* result being constructed */ bool error_result; /* do we need to make an ERROR result? */ - PGresult *next_result; /* next result (used in single-row mode) */ + PGresult *saved_result; /* original, empty result in partialResMode */ /* Assorted state for SASL, SSL, GSS, etc */ const pg_fe_sasl_mech *sasl; diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index b7e7a0947c..928ef6b170 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -1719,6 +1719,46 @@ test_singlerowmode(PGconn *conn) if (PQgetResult(conn) != NULL) pg_fatal("expected NULL result"); + /* + * Try chunked mode as well; make sure that it correctly delivers a + * partial final chunk. + */ + if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)", + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("failed to send query: %s", + PQerrorMessage(conn)); + if (PQsendFlushRequest(conn) != 1) + pg_fatal("failed to send flush request"); + if (PQsetChunkedRowsMode(conn, 3) != 1) + pg_fatal("PQsetChunkedRowsMode() failed"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_CHUNK) + pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s", + PQresStatus(PQresultStatus(res)), + PQerrorMessage(conn)); + if (PQntuples(res) != 3) + pg_fatal("Expected 3 rows, got %d", PQntuples(res)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_CHUNK) + pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s", + PQresStatus(PQresultStatus(res))); + if (PQntuples(res) != 2) + pg_fatal("Expected 2 rows, got %d", PQntuples(res)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("unexpected NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Expected PGRES_TUPLES_OK, got %s", + PQresStatus(PQresultStatus(res))); + if (PQntuples(res) != 0) + pg_fatal("Expected 0 rows, got %d", PQntuples(res)); + if (PQgetResult(conn) != NULL) + pg_fatal("expected NULL result"); + if (PQexitPipelineMode(conn) != 1) pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn)); diff --git a/src/test/modules/libpq_pipeline/traces/singlerow.trace b/src/test/modules/libpq_pipeline/traces/singlerow.trace index 83043e1407..029cd66581 100644 --- a/src/test/modules/libpq_pipeline/traces/singlerow.trace +++ b/src/test/modules/libpq_pipeline/traces/singlerow.trace @@ -56,4 +56,18 @@ B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 11 DataRow 1 1 '1' B 13 CommandComplete "SELECT 1" +F 36 Parse "" "SELECT generate_series(1, 5)" 0 +F 14 Bind "" "" 0 0 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 11 DataRow 1 1 '2' +B 11 DataRow 1 1 '3' +B 11 DataRow 1 1 '4' +B 11 DataRow 1 1 '5' +B 13 CommandComplete "SELECT 5" F 4 Terminate