diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 1e62d8091a..a67a836eba 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -70,6 +70,9 @@ typedef struct storeInfo AttInMetadata *attinmeta; MemoryContext tmpcontext; char **cstrs; + /* temp storage for results to avoid leaks on exception */ + PGresult *last_res; + PGresult *cur_res; } storeInfo; /* @@ -83,8 +86,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo, const char *conname, const char *sql, bool fail); -static int storeHandler(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); +static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql); +static void storeRow(storeInfo *sinfo, PGresult *res, bool first); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); @@ -630,7 +633,7 @@ dblink_send_query(PG_FUNCTION_ARGS) /* async query send */ retval = PQsendQuery(conn, sql); if (retval != 1) - elog(NOTICE, "%s", PQerrorMessage(conn)); + elog(NOTICE, "could not send query: %s", PQerrorMessage(conn)); PG_RETURN_INT32(retval); } @@ -927,8 +930,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res) /* * Execute the given SQL command and store its results into a tuplestore * to be returned as the result of the current function. + * * This is equivalent to PQexec followed by materializeResult, but we make - * use of libpq's "row processor" API to reduce per-row overhead. + * use of libpq's single-row mode to avoid accumulating the whole result + * inside libpq before it gets transferred to the tuplestore. */ static void materializeQueryResult(FunctionCallInfo fcinfo, @@ -944,19 +949,14 @@ materializeQueryResult(FunctionCallInfo fcinfo, /* prepTuplestoreResult must have been called previously */ Assert(rsinfo->returnMode == SFRM_Materialize); + /* initialize storeInfo to empty */ + memset(&sinfo, 0, sizeof(sinfo)); + sinfo.fcinfo = fcinfo; + PG_TRY(); { - /* initialize storeInfo to empty */ - memset(&sinfo, 0, sizeof(sinfo)); - sinfo.fcinfo = fcinfo; - - /* We'll collect tuples using storeHandler */ - PQsetRowProcessor(conn, storeHandler, &sinfo); - - res = PQexec(conn, sql); - - /* We don't keep the custom row processor installed permanently */ - PQsetRowProcessor(conn, NULL, NULL); + /* execute query, collecting any tuples into the tuplestore */ + res = storeQueryResult(&sinfo, conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && @@ -975,8 +975,8 @@ materializeQueryResult(FunctionCallInfo fcinfo, else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* - * storeHandler didn't get called, so we need to convert the - * command status string to a tuple manually + * storeRow didn't get called, so we need to convert the command + * status string to a tuple manually */ TupleDesc tupdesc; AttInMetadata *attinmeta; @@ -1008,25 +1008,30 @@ materializeQueryResult(FunctionCallInfo fcinfo, tuplestore_puttuple(tupstore, tuple); PQclear(res); + res = NULL; } else { Assert(PQresultStatus(res) == PGRES_TUPLES_OK); - /* storeHandler should have created a tuplestore */ + /* storeRow should have created a tuplestore */ Assert(rsinfo->setResult != NULL); PQclear(res); + res = NULL; } + PQclear(sinfo.last_res); + sinfo.last_res = NULL; + PQclear(sinfo.cur_res); + sinfo.cur_res = NULL; } PG_CATCH(); { - /* be sure to unset the custom row processor */ - PQsetRowProcessor(conn, NULL, NULL); /* be sure to release any libpq result we collected */ - if (res) - PQclear(res); + PQclear(res); + PQclear(sinfo.last_res); + PQclear(sinfo.cur_res); /* and clear out any pending data in libpq */ - while ((res = PQskipResult(conn)) != NULL) + while ((res = PQgetResult(conn)) != NULL) PQclear(res); PG_RE_THROW(); } @@ -1034,23 +1039,72 @@ materializeQueryResult(FunctionCallInfo fcinfo, } /* - * Custom row processor for materializeQueryResult. - * Prototype of this function must match PQrowProcessor. + * Execute query, and send any result rows to sinfo->tuplestore. */ -static int -storeHandler(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) +static PGresult * +storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql) +{ + bool first = true; + PGresult *res; + + if (!PQsendQuery(conn, sql)) + elog(ERROR, "could not send query: %s", PQerrorMessage(conn)); + + if (!PQsetSingleRowMode(conn)) /* shouldn't fail */ + elog(ERROR, "failed to set single-row mode for dblink query"); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + sinfo->cur_res = PQgetResult(conn); + if (!sinfo->cur_res) + break; + + if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE) + { + /* got one row from possibly-bigger resultset */ + storeRow(sinfo, sinfo->cur_res, first); + + PQclear(sinfo->cur_res); + sinfo->cur_res = NULL; + first = false; + } + else + { + /* if empty resultset, fill tuplestore header */ + if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK) + storeRow(sinfo, sinfo->cur_res, first); + + /* store completed result at last_res */ + PQclear(sinfo->last_res); + sinfo->last_res = sinfo->cur_res; + sinfo->cur_res = NULL; + first = true; + } + } + + /* return last_res */ + res = sinfo->last_res; + sinfo->last_res = NULL; + return res; +} + +/* + * Send single row to sinfo->tuplestore. + * + * If "first" is true, create the tuplestore using PGresult's metadata + * (in this case the PGresult might contain either zero or one row). + */ +static void +storeRow(storeInfo *sinfo, PGresult *res, bool first) { - storeInfo *sinfo = (storeInfo *) param; int nfields = PQnfields(res); - char **cstrs = sinfo->cstrs; HeapTuple tuple; - char *pbuf; - int pbuflen; int i; MemoryContext oldcontext; - if (columns == NULL) + if (first) { /* Prepare for new result set */ ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo; @@ -1098,13 +1152,16 @@ storeHandler(PGresult *res, const PGdataValue *columns, sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Create a new, empty tuplestore */ - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); rsinfo->setResult = sinfo->tuplestore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); + /* Done if empty resultset */ + if (PQntuples(res) == 0) + return; + /* * Set up sufficiently-wide string pointers array; this won't change * in size so it's easy to preallocate. @@ -1121,11 +1178,10 @@ storeHandler(PGresult *res, const PGdataValue *columns, ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - - return 1; } - CHECK_FOR_INTERRUPTS(); + /* Should have a single-row result if we get here */ + Assert(PQntuples(res) == 1); /* * Do the following work in a temp context that we reset after each tuple. @@ -1135,46 +1191,24 @@ storeHandler(PGresult *res, const PGdataValue *columns, oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext); /* - * The strings passed to us are not null-terminated, but the datatype - * input functions we're about to call require null termination. Copy the - * strings and add null termination. As a micro-optimization, allocate - * all the strings with one palloc. + * Fill cstrs with null-terminated strings of column values. */ - pbuflen = nfields; /* count the null terminators themselves */ for (i = 0; i < nfields; i++) { - int len = columns[i].len; - - if (len > 0) - pbuflen += len; - } - pbuf = (char *) palloc(pbuflen); - - for (i = 0; i < nfields; i++) - { - int len = columns[i].len; - - if (len < 0) - cstrs[i] = NULL; + if (PQgetisnull(res, 0, i)) + sinfo->cstrs[i] = NULL; else - { - cstrs[i] = pbuf; - memcpy(pbuf, columns[i].value, len); - pbuf += len; - *pbuf++ = '\0'; - } + sinfo->cstrs[i] = PQgetvalue(res, 0, i); } /* Convert row to a tuple, and add it to the tuplestore */ - tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs); tuplestore_puttuple(sinfo->tuplestore, tuple); /* Clean up */ MemoryContextSwitchTo(oldcontext); MemoryContextReset(sinfo->tmpcontext); - - return 1; } /* diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 5c5dd68db3..255c5c1abb 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -2418,14 +2418,28 @@ ExecStatusType PQresultStatus(const PGresult *res); PGRES_COPY_BOTH - Copy In/Out (to and from server) data transfer started. This is - currently used only for streaming replication. + Copy In/Out (to and from server) data transfer started. This + feature is currently used only for streaming replication, + so this status should not occur in ordinary applications. + + + + + + PGRES_SINGLE_TUPLE + + + The PGresult contains a single result tuple + from the current command. This status occurs only when + single-row mode has been selected for the query + (see ). - If the result status is PGRES_TUPLES_OK, then + If the result status is PGRES_TUPLES_OK or + PGRES_SINGLE_TUPLE, then the functions described below can be used to retrieve the rows returned by the query. Note that a SELECT command that happens to retrieve zero rows still shows @@ -2726,7 +2740,8 @@ void PQclear(PGresult *res); These functions are used to extract information from a PGresult object that represents a successful query result (that is, one that has status - PGRES_TUPLES_OK). They can also be used to extract + PGRES_TUPLES_OK or PGRES_SINGLE_TUPLE). + They can also be used to extract information from a successful Describe operation: a Describe's result has all the same column information that actual execution of the query would provide, but it has zero rows. For objects with other status values, @@ -3738,7 +3753,7 @@ unsigned char *PQunescapeBytea(const unsigned char *from, size_t *to_length); The PQexec function is adequate for submitting - commands in normal, synchronous applications. It has a couple of + commands in normal, synchronous applications. It has a few deficiencies, however, that can be of importance to some users: @@ -3769,6 +3784,15 @@ unsigned char *PQunescapeBytea(const unsigned char *from, size_t *to_length); PQexec. + + + + PQexec always collects the command's entire result, + buffering it in a single PGresult. While + this simplifies error-handling logic for the application, it can be + impractical for results containing many rows. + + @@ -3984,8 +4008,11 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName); Waits for the next result from a prior PQsendQuery, PQsendQueryParams, - PQsendPrepare, or - PQsendQueryPrepared call, and returns it. + PQsendPrepare, + PQsendQueryPrepared, + PQsendDescribePrepared, or + PQsendDescribePortal + call, and returns it. A null pointer is returned when the command is complete and there will be no more results. @@ -4012,7 +4039,7 @@ PGresult *PQgetResult(PGconn *conn); Even when PQresultStatus indicates a fatal error, PQgetResult should be called until it - returns a null pointer to allow libpq to + returns a null pointer, to allow libpq to process the error information completely. @@ -4029,7 +4056,18 @@ PGresult *PQgetResult(PGconn *conn); can be obtained individually. (This allows a simple form of overlapped processing, by the way: the client can be handling the results of one command while the server is still working on later queries in the same - command string.) However, calling PQgetResult + command string.) + + + + Another frequently-desired feature that can be obtained with + PQsendQuery and PQgetResult + is retrieving large query results a row at a time. This is discussed + in . + + + + By itself, calling PQgetResult will still cause the client to block until the server completes the next SQL command. This can be avoided by proper use of two more functions: @@ -4238,6 +4276,98 @@ int PQflush(PGconn *conn); + + Retrieving Query Results Row-By-Row + + + libpq + single-row mode + + + + Ordinarily, libpq collects a SQL command's + entire result and returns it to the application as a single + PGresult. This can be unworkable for commands + that return a large number of rows. For such cases, applications can use + PQsendQuery and PQgetResult in + single-row mode. In this mode, the result row(s) are + returned to the application one at a time, as they are received from the + server. + + + + To enter single-row mode, call PQsetSingleRowMode + immediately after a successful call of PQsendQuery + (or a sibling function). This mode selection is effective only for the + currently executing query. Then call PQgetResult + repeatedly, until it returns null, as documented in . If the query returns any rows, they are returned + as individual PGresult objects, which look like + normal query results except for having status code + PGRES_SINGLE_TUPLE instead of + PGRES_TUPLES_OK. After the last row, or immediately if + the query returns zero rows, a zero-row object with status + PGRES_TUPLES_OK is returned; this is the signal that no + more rows will arrive. (But note that it is still necessary to continue + calling PQgetResult until it returns null.) All of + these PGresult objects will contain the same row + description data (column names, types, etc) that an ordinary + PGresult object for the query would have. + Each object should be freed with PQclear as usual. + + + + + + + PQsetSingleRowMode + + PQsetSingleRowMode + + + + + + Select single-row mode for the currently-executing query. + + +int PQsetSingleRowMode(PGconn *conn); + + + + + This function can only be called immediately after + PQsendQuery or one of its sibling functions, + before any other operation on the connection such as + PQconsumeInput or + PQgetResult. If called at the correct time, + the function activates single-row mode for the current query and + returns 1. Otherwise the mode stays unchanged and the function + returns 0. In any case, the mode reverts to normal after + completion of the current query. + + + + + + + + + While processing a query, the server may return some rows and then + encounter an error, causing the query to be aborted. Ordinarily, + libpq discards any such rows and reports only the + error. But in single-row mode, those rows will have already been + returned to the application. Hence, the application will see some + PGRES_SINGLE_TUPLE PGresult + objects followed by a PGRES_FATAL_ERROR object. For + proper transactional behavior, the application must be designed to + discard or undo whatever has been done with the previously-processed + rows, if the query ultimately fails. + + + + + Canceling Queries in Progress @@ -5700,274 +5830,6 @@ defaultNoticeProcessor(void *arg, const char *message) - - Custom Row Processing - - - PQrowProcessor - - - - row processor - in libpq - - - - Ordinarily, when receiving a query result from the server, - libpq adds each row value to the current - PGresult until the entire result set is received; then - the PGresult is returned to the application as a unit. - This approach is simple to work with, but becomes inefficient for large - result sets. To improve performance, an application can register a - custom row processor function that processes each row - as the data is received from the network. The custom row processor could - process the data fully, or store it into some application-specific data - structure for later processing. - - - - - The row processor function sees the rows before it is known whether the - query will succeed overall, since the server might return some rows before - encountering an error. For proper transactional behavior, it must be - possible to discard or undo whatever the row processor has done, if the - query ultimately fails. - - - - - When using a custom row processor, row data is not accumulated into the - PGresult, so the PGresult ultimately delivered to - the application will contain no rows (PQntuples = - 0). However, it still has PQresultStatus = - PGRES_TUPLES_OK, and it contains correct information about the - set of columns in the query result. On the other hand, if the query fails - partway through, the returned PGresult has - PQresultStatus = PGRES_FATAL_ERROR. The - application must be prepared to undo any actions of the row processor - whenever it gets a PGRES_FATAL_ERROR result. - - - - A custom row processor is registered for a particular connection by - calling PQsetRowProcessor, described below. - This row processor will be used for all subsequent query results on that - connection until changed again. A row processor function must have a - signature matching - - -typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); - - where PGdataValue is described by - -typedef struct pgDataValue -{ - int len; /* data length in bytes, or <0 if NULL */ - const char *value; /* data value, without zero-termination */ -} PGdataValue; - - - - - The res parameter is the PGRES_TUPLES_OK - PGresult that will eventually be delivered to the calling - application (if no error intervenes). It contains information about - the set of columns in the query result, but no row data. In particular the - row processor must fetch PQnfields(res) to know the number of - data columns. - - - - Immediately after libpq has determined the result set's - column information, it will make a call to the row processor with - columns set to NULL, but the other parameters as - usual. The row processor can use this call to initialize for a new result - set; if it has nothing to do, it can just return 1. In - subsequent calls, one per received row, columns - is non-NULL and points to an array of PGdataValue structs, one per - data column. - - - - errmsgp is an output parameter used only for error - reporting. If the row processor needs to report an error, it can set - *errmsgp to point to a suitable message - string (and then return -1). As a special case, returning - -1 without changing *errmsgp - from its initial value of NULL is taken to mean out of memory. - - - - The last parameter, param, is just a void pointer - passed through from PQsetRowProcessor. This can be - used for communication between the row processor function and the - surrounding application. - - - - In the PGdataValue array passed to a row processor, data values - cannot be assumed to be zero-terminated, whether the data format is text - or binary. A SQL NULL value is indicated by a negative length field. - - - - The row processor must process the row data values - immediately, or else copy them into application-controlled storage. - The value pointers passed to the row processor point into - libpq's internal data input buffer, which will be - overwritten by the next packet fetch. - - - - The row processor function must return either 1 or - -1. - 1 is the normal, successful result value; libpq - will continue with receiving row values from the server and passing them to - the row processor. -1 indicates that the row processor has - encountered an error. In that case, - libpq will discard all remaining rows in the result set - and then return a PGRES_FATAL_ERROR PGresult to - the application (containing the specified error message, or out of - memory for query result if *errmsgp - was left as NULL). - - - - Another option for exiting a row processor is to throw an exception using - C's longjmp() or C++'s throw. If this is done, - processing of the incoming data can be resumed later by calling - PQgetResult; the row processor will be invoked as normal for - any remaining rows in the current result. - As with any usage of PQgetResult, the application - should continue calling PQgetResult until it gets a NULL - result before issuing any new query. - - - - In some cases, an exception may mean that the remainder of the - query result is not interesting. In such cases the application can discard - the remaining rows with PQskipResult, described below. - Another possible recovery option is to close the connection altogether with - PQfinish. - - - - - - - PQsetRowProcessor - - PQsetRowProcessor - - - - - - Sets a callback function to process each row. - - -void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); - - - - - The specified row processor function func is installed as - the active row processor for the given connection conn. - Also, param is installed as the passthrough pointer to - pass to it. Alternatively, if func is NULL, the standard - row processor is reinstalled on the given connection (and - param is ignored). - - - - Although the row processor can be changed at any time in the life of a - connection, it's generally unwise to do so while a query is active. - In particular, when using asynchronous mode, be aware that both - PQisBusy and PQgetResult can call the current - row processor. - - - - - - - PQgetRowProcessor - - PQgetRowProcessor - - - - - - Fetches the current row processor for the specified connection. - - -PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param); - - - - - In addition to returning the row processor function pointer, the - current passthrough pointer will be returned at - *param, if param is not NULL. - - - - - - - PQskipResult - - PQskipResult - - - - - - Discard all the remaining rows in the incoming result set. - - -PGresult *PQskipResult(PGconn *conn); - - - - - This is a simple convenience function to discard incoming data after a - row processor has failed or it's determined that the rest of the result - set is not interesting. PQskipResult is exactly - equivalent to PQgetResult except that it transiently - installs a dummy row processor function that just discards data. - The returned PGresult can be discarded without further ado - if it has status PGRES_TUPLES_OK; but other status values - should be handled normally. (In particular, - PGRES_FATAL_ERROR indicates a server-reported error that - will still need to be dealt with.) - As when using PQgetResult, one should usually repeat the - call until NULL is returned to ensure the connection has reached an - idle state. Another possible usage is to call - PQskipResult just once, and then resume using - PQgetResult to process subsequent result sets normally. - - - - Because PQskipResult will wait for server input, it is not - very useful in asynchronous applications. In particular you should not - code a loop of PQisBusy and PQskipResult, - because that will result in the installed row processor being called - within PQisBusy. To get the proper behavior in an - asynchronous application, you'll need to install a dummy row processor - (or set a flag to make your normal row processor do nothing) and leave - it that way until you have discarded all incoming data via your normal - PQisBusy and PQgetResult loop. - - - - - - - - Event System diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1251455f1f..9d95e262be 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,6 +160,4 @@ PQconnectStartParams 157 PQping 158 PQpingParams 159 PQlibVersion 160 -PQsetRowProcessor 161 -PQgetRowProcessor 162 -PQskipResult 163 +PQsetSingleRowMode 161 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index a32258a8cb..adaab7aaad 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2709,8 +2709,7 @@ makeEmptyPGconn(void) /* Zero all pointers and booleans */ MemSet(conn, 0, sizeof(PGconn)); - /* install default row processor and notice hooks */ - PQsetRowProcessor(conn, NULL, NULL); + /* install default notice hooks */ conn->noticeHooks.noticeRec = defaultNoticeReceiver; conn->noticeHooks.noticeProc = defaultNoticeProcessor; @@ -4658,7 +4657,7 @@ conninfo_uri_parse_options(PQconninfoOption *options, const char *uri, if (p == host) { printfPQExpBuffer(errorMessage, - libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), + libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), uri); goto cleanup; } @@ -4878,7 +4877,7 @@ conninfo_uri_parse_params(char *params, printfPQExpBuffer(errorMessage, libpq_gettext( - "invalid URI query parameter: \"%s\"\n"), + "invalid URI query parameter: \"%s\"\n"), keyword); return false; } @@ -4943,7 +4942,7 @@ conninfo_uri_decode(const char *str, PQExpBuffer errorMessage) if (!(get_hexdigit(*q++, &hi) && get_hexdigit(*q++, &lo))) { printfPQExpBuffer(errorMessage, - libpq_gettext("invalid percent-encoded token: \"%s\"\n"), + libpq_gettext("invalid percent-encoded token: \"%s\"\n"), str); free(buf); return NULL; @@ -5594,8 +5593,8 @@ static void dot_pg_pass_warning(PGconn *conn) { /* If it was 'invalid authorization', add .pgpass mention */ - if (conn->dot_pgpass_used && conn->password_needed && conn->result && /* only works with >= 9.0 servers */ + if (conn->dot_pgpass_used && conn->password_needed && conn->result && strcmp(PQresultErrorField(conn->result, PG_DIAG_SQLSTATE), ERRCODE_INVALID_PASSWORD) == 0) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index badc0b32a8..53516db723 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -38,7 +38,8 @@ char *const pgresStatus[] = { "PGRES_BAD_RESPONSE", "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", - "PGRES_COPY_BOTH" + "PGRES_COPY_BOTH", + "PGRES_SINGLE_TUPLE" }; /* @@ -51,8 +52,6 @@ static bool static_std_strings = false; static PGEvent *dupEvents(PGEvent *events, int count); static bool pqAddTuple(PGresult *res, PGresAttValue *tup); -static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); static bool PQsendQueryStart(PGconn *conn); static int PQsendQueryGuts(PGconn *conn, const char *command, @@ -64,8 +63,6 @@ static int PQsendQueryGuts(PGconn *conn, const int *paramFormats, int resultFormat); static void parseInput(PGconn *conn); -static int dummyRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); static bool PQexecStart(PGconn *conn); static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, @@ -181,6 +178,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_OUT: case PGRES_COPY_IN: case PGRES_COPY_BOTH: + case PGRES_SINGLE_TUPLE: /* non-error cases */ break; default: @@ -698,6 +696,8 @@ PQclear(PGresult *res) /* * Handy subroutine to deallocate any partially constructed async result. + * + * Any "next" result gets cleared too. */ void pqClearAsyncResult(PGconn *conn) @@ -705,6 +705,9 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result = NULL; + if (conn->next_result) + PQclear(conn->next_result); + conn->next_result = NULL; } /* @@ -758,7 +761,6 @@ pqPrepareAsyncResult(PGconn *conn) * conn->errorMessage. */ res = conn->result; - conn->result = NULL; /* handing over ownership to caller */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -771,6 +773,16 @@ pqPrepareAsyncResult(PGconn *conn) appendPQExpBufferStr(&conn->errorMessage, PQresultErrorMessage(res)); } + + /* + * Replace conn->result with next_result, if any. In the normal case + * there isn't a next result and we're just dropping ownership of the + * current result. In single-row mode this restores the situation to what + * it was before we created the current single-row result. + */ + conn->result = conn->next_result; + conn->next_result = NULL; + return res; } @@ -981,85 +993,55 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) /* - * PQsetRowProcessor - * Set function that copies row data out from the network buffer, - * along with a passthrough parameter for it. - */ -void -PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) -{ - if (!conn) - return; - - if (func) - { - /* set custom row processor */ - conn->rowProcessor = func; - conn->rowProcessorParam = param; - } - else - { - /* set default row processor */ - conn->rowProcessor = pqStdRowProcessor; - conn->rowProcessorParam = conn; - } -} - -/* - * PQgetRowProcessor - * Get current row processor of PGconn. - * If param is not NULL, also store the passthrough parameter at *param. - */ -PQrowProcessor -PQgetRowProcessor(const PGconn *conn, void **param) -{ - if (!conn) - { - if (param) - *param = NULL; - return NULL; - } - - if (param) - *param = conn->rowProcessorParam; - return conn->rowProcessor; -} - -/* - * pqStdRowProcessor - * Add the received row to the PGresult structure - * Returns 1 if OK, -1 if error occurred. + * pqRowProcessor + * Add the received row to the current async result (conn->result). + * Returns 1 if OK, 0 if error occurred. * - * Note: "param" should point to the PGconn, but we don't actually need that - * as of the current coding. + * On error, *errmsgp can be set to an error string to be returned. + * If it is left NULL, the error is presumed to be "out of memory". + * + * In single-row mode, we create a new result holding just the current row, + * stashing the previous result in conn->next_result so that it becomes + * active again after pqPrepareAsyncResult(). This allows the result metadata + * (column descriptions) to be carried forward to each result row. */ -static int -pqStdRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) +int +pqRowProcessor(PGconn *conn, const char **errmsgp) { + PGresult *res = conn->result; int nfields = res->numAttributes; + const PGdataValue *columns = conn->rowBuf; PGresAttValue *tup; int i; - if (columns == NULL) + /* + * In single-row mode, make a new PGresult that will hold just this one + * row; the original conn->result is left unchanged so that it can be used + * again as the template for future rows. + */ + if (conn->singleRowMode) { - /* New result set ... we have nothing to do in this function. */ - return 1; + /* Copy everything that should be in the result at this point */ + res = PQcopyResult(res, + PG_COPYRES_ATTRS | PG_COPYRES_EVENTS | + PG_COPYRES_NOTICEHOOKS); + if (!res) + return 0; } /* * Basically we just allocate space in the PGresult for each field and * copy the data over. * - * Note: on malloc failure, we return -1 leaving *errmsgp still NULL, - * which caller will take to mean "out of memory". This is preferable to - * trying to set up such a message here, because evidently there's not - * enough memory for gettext() to do anything. + * Note: on malloc failure, we return 0 leaving *errmsgp still NULL, which + * caller will take to mean "out of memory". This is preferable to trying + * to set up such a message here, because evidently there's not enough + * memory for gettext() to do anything. */ tup = (PGresAttValue *) pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); if (tup == NULL) - return -1; + goto fail; for (i = 0; i < nfields; i++) { @@ -1078,7 +1060,7 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns, val = (char *) pqResultAlloc(res, clen + 1, isbinary); if (val == NULL) - return -1; + goto fail; /* copy and zero-terminate the data (even if it's binary) */ memcpy(val, columns[i].value, clen); @@ -1091,10 +1073,30 @@ pqStdRowProcessor(PGresult *res, const PGdataValue *columns, /* And add the tuple to the PGresult's tuple array */ if (!pqAddTuple(res, tup)) - return -1; + goto fail; + + /* + * Success. In single-row mode, make the result available to the client + * immediately. + */ + if (conn->singleRowMode) + { + /* Change result status to special single-row value */ + res->resultStatus = PGRES_SINGLE_TUPLE; + /* Stash old result for re-use later */ + conn->next_result = conn->result; + conn->result = res; + /* And mark the result ready to return */ + conn->asyncStatus = PGASYNC_READY; + } - /* Success */ return 1; + +fail: + /* release locally allocated PGresult, if we made one */ + if (res != conn->result) + PQclear(res); + return 0; } @@ -1343,6 +1345,10 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result = NULL; + conn->next_result = NULL; + + /* reset single-row processing mode */ + conn->singleRowMode = false; /* ready to send command message */ return true; @@ -1547,6 +1553,31 @@ pqHandleSendFailure(PGconn *conn) parseInput(conn); } +/* + * Select row-by-row processing mode + */ +int +PQsetSingleRowMode(PGconn *conn) +{ + /* + * Only allow setting the flag when we have launched a query and not yet + * received any results. + */ + if (!conn) + return 0; + if (conn->asyncStatus != PGASYNC_BUSY) + return 0; + if (conn->queryclass != PGQUERY_SIMPLE && + conn->queryclass != PGQUERY_EXTENDED) + return 0; + if (conn->result) + return 0; + + /* OK, set flag */ + conn->singleRowMode = true; + return 1; +} + /* * Consume any available input from the backend * 0 return: some kind of trouble @@ -1587,9 +1618,6 @@ PQconsumeInput(PGconn *conn) * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. - * - * Note: callers of parseInput must be prepared for a longjmp exit when we are - * in PGASYNC_BUSY state, since an external row processor might do that. */ static void parseInput(PGconn *conn) @@ -1737,49 +1765,6 @@ PQgetResult(PGconn *conn) return res; } -/* - * PQskipResult - * Get the next PGresult produced by a query, but discard any data rows. - * - * This is mainly useful for cleaning up after a longjmp out of a row - * processor, when resuming processing of the current query result isn't - * wanted. Note that this is of little value in an async-style application, - * since any preceding calls to PQisBusy would have already called the regular - * row processor. - */ -PGresult * -PQskipResult(PGconn *conn) -{ - PGresult *res; - PQrowProcessor savedRowProcessor; - - if (!conn) - return NULL; - - /* temporarily install dummy row processor */ - savedRowProcessor = conn->rowProcessor; - conn->rowProcessor = dummyRowProcessor; - /* no need to save/change rowProcessorParam */ - - /* fetch the next result */ - res = PQgetResult(conn); - - /* restore previous row processor */ - conn->rowProcessor = savedRowProcessor; - - return res; -} - -/* - * Do-nothing row processor for PQskipResult - */ -static int -dummyRowProcessor(PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param) -{ - return 1; -} - /* * PQexec @@ -1886,7 +1871,7 @@ PQexecStart(PGconn *conn) * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. */ - while ((result = PQskipResult(conn)) != NULL) + while ((result = PQgetResult(conn)) != NULL) { ExecStatusType resultStatus = result->resultStatus; diff --git a/src/interfaces/libpq/fe-lobj.c b/src/interfaces/libpq/fe-lobj.c index 13fd98c2f9..f3a6d0341c 100644 --- a/src/interfaces/libpq/fe-lobj.c +++ b/src/interfaces/libpq/fe-lobj.c @@ -682,8 +682,6 @@ lo_initialize(PGconn *conn) int n; const char *query; const char *fname; - PQrowProcessor savedRowProcessor; - void *savedRowProcessorParam; Oid foid; if (!conn) @@ -732,16 +730,7 @@ lo_initialize(PGconn *conn) "or proname = 'loread' " "or proname = 'lowrite'"; - /* Ensure the standard row processor is used to collect the result */ - savedRowProcessor = conn->rowProcessor; - savedRowProcessorParam = conn->rowProcessorParam; - PQsetRowProcessor(conn, NULL, NULL); - res = PQexec(conn, query); - - conn->rowProcessor = savedRowProcessor; - conn->rowProcessorParam = savedRowProcessorParam; - if (res == NULL) { free(lobjfuncs); diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 8dbd6b6982..1ba5885cd3 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -49,19 +49,11 @@ static int getNotify(PGconn *conn); PostgresPollingStatusType pqSetenvPoll(PGconn *conn) { - PostgresPollingStatusType result; PGresult *res; - PQrowProcessor savedRowProcessor; - void *savedRowProcessorParam; if (conn == NULL || conn->status == CONNECTION_BAD) return PGRES_POLLING_FAILED; - /* Ensure the standard row processor is used to collect any results */ - savedRowProcessor = conn->rowProcessor; - savedRowProcessorParam = conn->rowProcessorParam; - PQsetRowProcessor(conn, NULL, NULL); - /* Check whether there are any data for us */ switch (conn->setenv_state) { @@ -77,10 +69,7 @@ pqSetenvPoll(PGconn *conn) if (n < 0) goto error_return; if (n == 0) - { - result = PGRES_POLLING_READING; - goto normal_return; - } + return PGRES_POLLING_READING; break; } @@ -94,8 +83,7 @@ pqSetenvPoll(PGconn *conn) /* Should we raise an error if called when not active? */ case SETENV_STATE_IDLE: - result = PGRES_POLLING_OK; - goto normal_return; + return PGRES_POLLING_OK; default: printfPQExpBuffer(&conn->errorMessage, @@ -192,10 +180,7 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_CLIENT_ENCODING_WAIT: { if (PQisBusy(conn)) - { - result = PGRES_POLLING_READING; - goto normal_return; - } + return PGRES_POLLING_READING; res = PQgetResult(conn); @@ -220,10 +205,7 @@ pqSetenvPoll(PGconn *conn) case SETENV_STATE_OPTION_WAIT: { if (PQisBusy(conn)) - { - result = PGRES_POLLING_READING; - goto normal_return; - } + return PGRES_POLLING_READING; res = PQgetResult(conn); @@ -262,17 +244,13 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY1_WAIT; - result = PGRES_POLLING_READING; - goto normal_return; + return PGRES_POLLING_READING; } case SETENV_STATE_QUERY1_WAIT: { if (PQisBusy(conn)) - { - result = PGRES_POLLING_READING; - goto normal_return; - } + return PGRES_POLLING_READING; res = PQgetResult(conn); @@ -349,17 +327,13 @@ pqSetenvPoll(PGconn *conn) goto error_return; conn->setenv_state = SETENV_STATE_QUERY2_WAIT; - result = PGRES_POLLING_READING; - goto normal_return; + return PGRES_POLLING_READING; } case SETENV_STATE_QUERY2_WAIT: { if (PQisBusy(conn)) - { - result = PGRES_POLLING_READING; - goto normal_return; - } + return PGRES_POLLING_READING; res = PQgetResult(conn); @@ -406,8 +380,7 @@ pqSetenvPoll(PGconn *conn) { /* Query finished, so we're done */ conn->setenv_state = SETENV_STATE_IDLE; - result = PGRES_POLLING_OK; - goto normal_return; + return PGRES_POLLING_OK; } break; } @@ -425,12 +398,7 @@ pqSetenvPoll(PGconn *conn) error_return: conn->setenv_state = SETENV_STATE_IDLE; - result = PGRES_POLLING_FAILED; - -normal_return: - conn->rowProcessor = savedRowProcessor; - conn->rowProcessorParam = savedRowProcessorParam; - return result; + return PGRES_POLLING_FAILED; } @@ -438,9 +406,6 @@ normal_return: * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. - * - * Note: callers of parseInput must be prepared for a longjmp exit when we are - * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput2(PGconn *conn) @@ -746,31 +711,16 @@ getRowDescriptions(PGconn *conn) /* Success! */ conn->result = result; - /* - * Advance inStart to show that the "T" message has been processed. We - * must do this before calling the row processor, in case it longjmps. - */ + /* Advance inStart to show that the "T" message has been processed. */ conn->inStart = conn->inCursor; - /* Give the row processor a chance to initialize for new result set */ - errmsg = NULL; - switch ((*conn->rowProcessor) (result, NULL, &errmsg, - conn->rowProcessorParam)) - { - case 1: - /* everything is good */ - return 0; + /* + * We could perform additional setup for the new result set here, but for + * now there's nothing else to do. + */ - case -1: - /* error, report the errmsg below */ - break; - - default: - /* unrecognized return code */ - errmsg = libpq_gettext("unrecognized return value from row processor"); - break; - } - goto set_error_result; + /* And we're done. */ + return 0; advance_and_error: @@ -781,8 +731,6 @@ advance_and_error: */ conn->inStart = conn->inEnd; -set_error_result: - /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. @@ -790,7 +738,7 @@ set_error_result: pqClearAsyncResult(conn); /* - * If row processor didn't provide an error message, assume "out of + * If preceding code didn't provide an error message, assume "out of * memory" was meant. The advantage of having this special case is that * freeing the old result first greatly improves the odds that gettext() * will succeed in providing a translation. @@ -937,31 +885,15 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); bitmap = NULL; - /* - * Advance inStart to show that the "D" message has been processed. We - * must do this before calling the row processor, in case it longjmps. - */ + /* Advance inStart to show that the "D" message has been processed. */ conn->inStart = conn->inCursor; - /* Pass the completed row values to rowProcessor */ + /* Process the collected row */ errmsg = NULL; - switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, - conn->rowProcessorParam)) - { - case 1: - /* everything is good */ - return 0; + if (pqRowProcessor(conn, &errmsg)) + return 0; /* normal, successful exit */ - case -1: - /* error, report the errmsg below */ - break; - - default: - /* unrecognized return code */ - errmsg = libpq_gettext("unrecognized return value from row processor"); - break; - } - goto set_error_result; + goto set_error_result; /* pqRowProcessor failed, report it */ advance_and_error: @@ -981,7 +913,7 @@ set_error_result: pqClearAsyncResult(conn); /* - * If row processor didn't provide an error message, assume "out of + * If preceding code didn't provide an error message, assume "out of * memory" was meant. The advantage of having this special case is that * freeing the old result first greatly improves the odds that gettext() * will succeed in providing a translation. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 173af2e0a7..d289f82285 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -61,9 +61,6 @@ static int build_startup_packet(const PGconn *conn, char *packet, * parseInput: if appropriate, parse input data from backend * until input is exhausted or a stopping state is reached. * Note that this function will NOT attempt to read more data from the backend. - * - * Note: callers of parseInput must be prepared for a longjmp exit when we are - * in PGASYNC_BUSY state, since an external row processor might do that. */ void pqParseInput3(PGconn *conn) @@ -446,10 +443,6 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) * Returns: 0 if processed message successfully, EOF to suspend parsing * (the latter case is not actually used currently). * In either case, conn->inStart has been advanced past the message. - * - * Note: the row processor could also choose to longjmp out of libpq, - * in which case the library's state must allow for resumption at the - * next message. */ static int getRowDescriptions(PGconn *conn, int msgLength) @@ -564,10 +557,7 @@ getRowDescriptions(PGconn *conn, int msgLength) /* Success! */ conn->result = result; - /* - * Advance inStart to show that the "T" message has been processed. We - * must do this before calling the row processor, in case it longjmps. - */ + /* Advance inStart to show that the "T" message has been processed. */ conn->inStart = conn->inCursor; /* @@ -580,25 +570,13 @@ getRowDescriptions(PGconn *conn, int msgLength) return 0; } - /* Give the row processor a chance to initialize for new result set */ - errmsg = NULL; - switch ((*conn->rowProcessor) (result, NULL, &errmsg, - conn->rowProcessorParam)) - { - case 1: - /* everything is good */ - return 0; + /* + * We could perform additional setup for the new result set here, but for + * now there's nothing else to do. + */ - case -1: - /* error, report the errmsg below */ - break; - - default: - /* unrecognized return code */ - errmsg = libpq_gettext("unrecognized return value from row processor"); - break; - } - goto set_error_result; + /* And we're done. */ + return 0; advance_and_error: /* Discard unsaved result, if any */ @@ -608,8 +586,6 @@ advance_and_error: /* Discard the failed message by pretending we read it */ conn->inStart += 5 + msgLength; -set_error_result: - /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. @@ -617,8 +593,10 @@ set_error_result: pqClearAsyncResult(conn); /* - * If row processor didn't provide an error message, assume "out of - * memory" was meant. + * If preceding code didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. */ if (!errmsg) errmsg = libpq_gettext("out of memory for query result"); @@ -695,10 +673,6 @@ failure: * Returns: 0 if processed message successfully, EOF to suspend parsing * (the latter case is not actually used currently). * In either case, conn->inStart has been advanced past the message. - * - * Note: the row processor could also choose to longjmp out of libpq, - * in which case the library's state must allow for resumption at the - * next message. */ static int getAnotherTuple(PGconn *conn, int msgLength) @@ -778,31 +752,15 @@ getAnotherTuple(PGconn *conn, int msgLength) goto advance_and_error; } - /* - * Advance inStart to show that the "D" message has been processed. We - * must do this before calling the row processor, in case it longjmps. - */ + /* Advance inStart to show that the "D" message has been processed. */ conn->inStart = conn->inCursor; - /* Pass the completed row values to rowProcessor */ + /* Process the collected row */ errmsg = NULL; - switch ((*conn->rowProcessor) (result, rowbuf, &errmsg, - conn->rowProcessorParam)) - { - case 1: - /* everything is good */ - return 0; + if (pqRowProcessor(conn, &errmsg)) + return 0; /* normal, successful exit */ - case -1: - /* error, report the errmsg below */ - break; - - default: - /* unrecognized return code */ - errmsg = libpq_gettext("unrecognized return value from row processor"); - break; - } - goto set_error_result; + goto set_error_result; /* pqRowProcessor failed, report it */ advance_and_error: /* Discard the failed message by pretending we read it */ @@ -817,7 +775,7 @@ set_error_result: pqClearAsyncResult(conn); /* - * If row processor didn't provide an error message, assume "out of + * If preceding code didn't provide an error message, assume "out of * memory" was meant. The advantage of having this special case is that * freeing the old result first greatly improves the odds that gettext() * will succeed in providing a translation. diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 67db6119bb..9d05dd2060 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -90,7 +90,8 @@ typedef enum * backend */ PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ - PGRES_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ } ExecStatusType; typedef enum @@ -129,17 +130,6 @@ typedef struct pg_conn PGconn; */ typedef struct pg_result PGresult; -/* PGdataValue represents a data field value being passed to a row processor. - * It could be either text or binary data; text data is not zero-terminated. - * A SQL NULL is represented by len < 0; then value is still valid but there - * are no data bytes there. - */ -typedef struct pgDataValue -{ - int len; /* data length in bytes, or <0 if NULL */ - const char *value; /* data value, without zero-termination */ -} PGdataValue; - /* PGcancel encapsulates the information needed to cancel a running * query on an existing connection. * The contents of this struct are not supposed to be known to applications. @@ -161,10 +151,6 @@ typedef struct pgNotify struct pgNotify *next; /* list link */ } PGnotify; -/* Function type for row-processor callback */ -typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns, - const char **errmsgp, void *param); - /* Function types for notice-handling callbacks */ typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res); typedef void (*PQnoticeProcessor) (void *arg, const char *message); @@ -403,17 +389,13 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramLengths, const int *paramFormats, int resultFormat); +extern int PQsetSingleRowMode(PGconn *conn); extern PGresult *PQgetResult(PGconn *conn); -extern PGresult *PQskipResult(PGconn *conn); /* Routines for managing an asynchronous query */ extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); -/* Override default per-row processing */ -extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); -extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param); - /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 4bc89269fa..2bac59c3d8 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -277,6 +277,17 @@ typedef struct pgLobjfuncs Oid fn_lo_write; /* OID of backend function LOwrite */ } PGlobjfuncs; +/* PGdataValue represents a data field value being passed to a row processor. + * It could be either text or binary data; text data is not zero-terminated. + * A SQL NULL is represented by len < 0; then value is still valid but there + * are no data bytes there. + */ +typedef struct pgDataValue +{ + int len; /* data length in bytes, or <0 if NULL */ + const char *value; /* data value, without zero-termination */ +} PGdataValue; + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -324,10 +335,6 @@ struct pg_conn /* Optional file to write trace info to */ FILE *Pfdebug; - /* Callback procedure for per-row processing */ - PQrowProcessor rowProcessor; /* function pointer */ - void *rowProcessorParam; /* passthrough argument */ - /* Callback procedures for notice message processing */ PGNoticeHooks noticeHooks; @@ -346,6 +353,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY * OUT */ @@ -406,6 +414,7 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ + PGresult *next_result; /* next result (used in single-row mode) */ /* Assorted state for SSL, GSS, etc */ @@ -517,6 +526,7 @@ extern void pqSaveMessageField(PGresult *res, char code, const char *value); extern void pqSaveParameterStatus(PGconn *conn, const char *name, const char *value); +extern int pqRowProcessor(PGconn *conn, const char **errmsgp); extern void pqHandleSendFailure(PGconn *conn); /* === in fe-protocol2.c === */