Add a "row processor" API to libpq for better handling of large results.

Traditionally libpq has collected an entire query result before passing
it back to the application.  That provides a simple and transactional API,
but it's pretty inefficient for large result sets.  This patch allows the
application to process each row on-the-fly instead of accumulating the
rows into the PGresult.  Error recovery becomes a bit more complex, but
often that tradeoff is well worth making.

Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
This commit is contained in:
Tom Lane 2012-04-04 18:27:56 -04:00
parent cb917e1544
commit 92785dac2e
10 changed files with 994 additions and 204 deletions

View File

@ -5581,6 +5581,274 @@ defaultNoticeProcessor(void *arg, const char *message)
</sect1>
<sect1 id="libpq-row-processor">
<title>Custom Row Processing</title>
<indexterm zone="libpq-row-processor">
<primary>PQrowProcessor</primary>
</indexterm>
<indexterm zone="libpq-row-processor">
<primary>row processor</primary>
<secondary>in libpq</secondary>
</indexterm>
<para>
Ordinarily, when receiving a query result from the server,
<application>libpq</> adds each row value to the current
<type>PGresult</type> until the entire result set is received; then
the <type>PGresult</type> is returned to the application as a unit.
This approach is simple to work with, but becomes inefficient for large
result sets. To improve performance, an application can register a
custom <firstterm>row processor</> function that processes each row
as the data is received from the network. The custom row processor could
process the data fully, or store it into some application-specific data
structure for later processing.
</para>
<caution>
<para>
The row processor function sees the rows before it is known whether the
query will succeed overall, since the server might return some rows before
encountering an error. For proper transactional behavior, it must be
possible to discard or undo whatever the row processor has done, if the
query ultimately fails.
</para>
</caution>
<para>
When using a custom row processor, row data is not accumulated into the
<type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
the application will contain no rows (<function>PQntuples</> =
<literal>0</>). However, it still has <function>PQresultStatus</> =
<literal>PGRES_TUPLES_OK</>, and it contains correct information about the
set of columns in the query result. On the other hand, if the query fails
partway through, the returned <type>PGresult</type> has
<function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>. The
application must be prepared to undo any actions of the row processor
whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
</para>
<para>
A custom row processor is registered for a particular connection by
calling <function>PQsetRowProcessor</function>, described below.
This row processor will be used for all subsequent query results on that
connection until changed again. A row processor function must have a
signature matching
<synopsis>
typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
</synopsis>
where <type>PGdataValue</> is described by
<synopsis>
typedef struct pgDataValue
{
int len; /* data length in bytes, or <0 if NULL */
const char *value; /* data value, without zero-termination */
} PGdataValue;
</synopsis>
</para>
<para>
The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
<type>PGresult</type> that will eventually be delivered to the calling
application (if no error intervenes). It contains information about
the set of columns in the query result, but no row data. In particular the
row processor must fetch <literal>PQnfields(res)</> to know the number of
data columns.
</para>
<para>
Immediately after <application>libpq</> has determined the result set's
column information, it will make a call to the row processor with
<parameter>columns</parameter> set to NULL, but the other parameters as
usual. The row processor can use this call to initialize for a new result
set; if it has nothing to do, it can just return <literal>1</>. In
subsequent calls, one per received row, <parameter>columns</parameter>
is non-NULL and points to an array of <type>PGdataValue</> structs, one per
data column.
</para>
<para>
<parameter>errmsgp</parameter> is an output parameter used only for error
reporting. If the row processor needs to report an error, it can set
<literal>*</><parameter>errmsgp</parameter> to point to a suitable message
string (and then return <literal>-1</>). As a special case, returning
<literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
from its initial value of NULL is taken to mean <quote>out of memory</>.
</para>
<para>
The last parameter, <parameter>param</parameter>, is just a void pointer
passed through from <function>PQsetRowProcessor</function>. This can be
used for communication between the row processor function and the
surrounding application.
</para>
<para>
In the <type>PGdataValue</> array passed to a row processor, data values
cannot be assumed to be zero-terminated, whether the data format is text
or binary. A SQL NULL value is indicated by a negative length field.
</para>
<para>
The row processor <emphasis>must</> process the row data values
immediately, or else copy them into application-controlled storage.
The value pointers passed to the row processor point into
<application>libpq</>'s internal data input buffer, which will be
overwritten by the next packet fetch.
</para>
<para>
The row processor function must return either <literal>1</> or
<literal>-1</>.
<literal>1</> is the normal, successful result value; <application>libpq</>
will continue with receiving row values from the server and passing them to
the row processor. <literal>-1</> indicates that the row processor has
encountered an error. In that case,
<application>libpq</> will discard all remaining rows in the result set
and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
the application (containing the specified error message, or <quote>out of
memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
was left as NULL).
</para>
<para>
Another option for exiting a row processor is to throw an exception using
C's <function>longjmp()</> or C++'s <literal>throw</>. If this is done,
processing of the incoming data can be resumed later by calling
<function>PQgetResult</>; the row processor will be invoked as normal for
any remaining rows in the current result.
As with any usage of <function>PQgetResult</>, the application
should continue calling <function>PQgetResult</> until it gets a NULL
result before issuing any new query.
</para>
<para>
In some cases, an exception may mean that the remainder of the
query result is not interesting. In such cases the application can discard
the remaining rows with <function>PQskipResult</>, described below.
Another possible recovery option is to close the connection altogether with
<function>PQfinish</>.
</para>
<para>
<variablelist>
<varlistentry id="libpq-pqsetrowprocessor">
<term>
<function>PQsetRowProcessor</function>
<indexterm>
<primary>PQsetRowProcessor</primary>
</indexterm>
</term>
<listitem>
<para>
Sets a callback function to process each row.
<synopsis>
void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
</synopsis>
</para>
<para>
The specified row processor function <parameter>func</> is installed as
the active row processor for the given connection <parameter>conn</>.
Also, <parameter>param</> is installed as the passthrough pointer to
pass to it. Alternatively, if <parameter>func</> is NULL, the standard
row processor is reinstalled on the given connection (and
<parameter>param</> is ignored).
</para>
<para>
Although the row processor can be changed at any time in the life of a
connection, it's generally unwise to do so while a query is active.
In particular, when using asynchronous mode, be aware that both
<function>PQisBusy</> and <function>PQgetResult</> can call the current
row processor.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pqgetrowprocessor">
<term>
<function>PQgetRowProcessor</function>
<indexterm>
<primary>PQgetRowProcessor</primary>
</indexterm>
</term>
<listitem>
<para>
Fetches the current row processor for the specified connection.
<synopsis>
PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
</synopsis>
</para>
<para>
In addition to returning the row processor function pointer, the
current passthrough pointer will be returned at
<literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
</para>
</listitem>
</varlistentry>
<varlistentry id="libpq-pqskipresult">
<term>
<function>PQskipResult</function>
<indexterm>
<primary>PQskipResult</primary>
</indexterm>
</term>
<listitem>
<para>
Discard all the remaining rows in the incoming result set.
<synopsis>
PGresult *PQskipResult(PGconn *conn);
</synopsis>
</para>
<para>
This is a simple convenience function to discard incoming data after a
row processor has failed or it's determined that the rest of the result
set is not interesting. <function>PQskipResult</> is exactly
equivalent to <function>PQgetResult</> except that it transiently
installs a dummy row processor function that just discards data.
The returned <type>PGresult</> can be discarded without further ado
if it has status <literal>PGRES_TUPLES_OK</>; but other status values
should be handled normally. (In particular,
<literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
will still need to be dealt with.)
As when using <function>PQgetResult</>, one should usually repeat the
call until NULL is returned to ensure the connection has reached an
idle state. Another possible usage is to call
<function>PQskipResult</> just once, and then resume using
<function>PQgetResult</> to process subsequent result sets normally.
</para>
<para>
Because <function>PQskipResult</> will wait for server input, it is not
very useful in asynchronous applications. In particular you should not
code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
because that will result in the installed row processor being called
within <function>PQisBusy</>. To get the proper behavior in an
asynchronous application, you'll need to install a dummy row processor
(or set a flag to make your normal row processor do nothing) and leave
it that way until you have discarded all incoming data via your normal
<function>PQisBusy</> and <function>PQgetResult</> loop.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</sect1>
<sect1 id="libpq-events">
<title>Event System</title>

View File

@ -160,3 +160,6 @@ PQconnectStartParams 157
PQping 158
PQpingParams 159
PQlibVersion 160
PQsetRowProcessor 161
PQgetRowProcessor 162
PQskipResult 163

View File

@ -2425,7 +2425,7 @@ keep_going: /* We will come back to here until there is
conn->status = CONNECTION_AUTH_OK;
/*
* Set asyncStatus so that PQsetResult will think that
* Set asyncStatus so that PQgetResult will think that
* what comes back next is the result of a query. See
* below.
*/
@ -2686,8 +2686,11 @@ makeEmptyPGconn(void)
/* Zero all pointers and booleans */
MemSet(conn, 0, sizeof(PGconn));
/* install default row processor and notice hooks */
PQsetRowProcessor(conn, NULL, NULL);
conn->noticeHooks.noticeRec = defaultNoticeReceiver;
conn->noticeHooks.noticeProc = defaultNoticeProcessor;
conn->status = CONNECTION_BAD;
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
@ -2721,11 +2724,14 @@ makeEmptyPGconn(void)
conn->inBuffer = (char *) malloc(conn->inBufSize);
conn->outBufSize = 16 * 1024;
conn->outBuffer = (char *) malloc(conn->outBufSize);
conn->rowBufLen = 32;
conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
if (conn->inBuffer == NULL ||
conn->outBuffer == NULL ||
conn->rowBuf == NULL ||
PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))
{
@ -2829,6 +2835,8 @@ freePGconn(PGconn *conn)
free(conn->inBuffer);
if (conn->outBuffer)
free(conn->outBuffer);
if (conn->rowBuf)
free(conn->rowBuf);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
@ -2888,7 +2896,7 @@ closePGconn(PGconn *conn)
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
* absent */
conn->asyncStatus = PGASYNC_IDLE;
pqClearAsyncResult(conn); /* deallocate result and curTuple */
pqClearAsyncResult(conn); /* deallocate result */
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
conn->addrlist = NULL;
conn->addr_cur = NULL;

View File

@ -50,6 +50,9 @@ static bool static_std_strings = false;
static PGEvent *dupEvents(PGEvent *events, int count);
static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
static bool PQsendQueryStart(PGconn *conn);
static int PQsendQueryGuts(PGconn *conn,
const char *command,
@ -61,6 +64,8 @@ static int PQsendQueryGuts(PGconn *conn,
const int *paramFormats,
int resultFormat);
static void parseInput(PGconn *conn);
static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
static bool PQexecStart(PGconn *conn);
static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
@ -694,14 +699,12 @@ PQclear(PGresult *res)
/*
* Handy subroutine to deallocate any partially constructed async result.
*/
void
pqClearAsyncResult(PGconn *conn)
{
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
conn->curTuple = NULL;
}
/*
@ -756,7 +759,6 @@ pqPrepareAsyncResult(PGconn *conn)
*/
res = conn->result;
conn->result = NULL; /* handing over ownership to caller */
conn->curTuple = NULL; /* just in case */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
@ -832,7 +834,7 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
* add a row pointer to the PGresult structure, growing it if necessary
* Returns TRUE if OK, FALSE if not enough memory to add the row
*/
int
static bool
pqAddTuple(PGresult *res, PGresAttValue *tup)
{
if (res->ntups >= res->tupArrSize)
@ -978,6 +980,124 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value)
}
/*
* PQsetRowProcessor
* Set function that copies row data out from the network buffer,
* along with a passthrough parameter for it.
*/
void
PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
{
if (!conn)
return;
if (func)
{
/* set custom row processor */
conn->rowProcessor = func;
conn->rowProcessorParam = param;
}
else
{
/* set default row processor */
conn->rowProcessor = pqStdRowProcessor;
conn->rowProcessorParam = conn;
}
}
/*
* PQgetRowProcessor
* Get current row processor of PGconn.
* If param is not NULL, also store the passthrough parameter at *param.
*/
PQrowProcessor
PQgetRowProcessor(const PGconn *conn, void **param)
{
if (!conn)
{
if (param)
*param = NULL;
return NULL;
}
if (param)
*param = conn->rowProcessorParam;
return conn->rowProcessor;
}
/*
* pqStdRowProcessor
* Add the received row to the PGresult structure
* Returns 1 if OK, -1 if error occurred.
*
* Note: "param" should point to the PGconn, but we don't actually need that
* as of the current coding.
*/
static int
pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param)
{
int nfields = res->numAttributes;
PGresAttValue *tup;
int i;
if (columns == NULL)
{
/* New result set ... we have nothing to do in this function. */
return 1;
}
/*
* Basically we just allocate space in the PGresult for each field and
* copy the data over.
*
* Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
* which caller will take to mean "out of memory". This is preferable to
* trying to set up such a message here, because evidently there's not
* enough memory for gettext() to do anything.
*/
tup = (PGresAttValue *)
pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
if (tup == NULL)
return -1;
for (i = 0; i < nfields; i++)
{
int clen = columns[i].len;
if (clen < 0)
{
/* null field */
tup[i].len = NULL_LEN;
tup[i].value = res->null_field;
}
else
{
bool isbinary = (res->attDescs[i].format != 0);
char *val;
val = (char *) pqResultAlloc(res, clen + 1, isbinary);
if (val == NULL)
return -1;
/* copy and zero-terminate the data (even if it's binary) */
memcpy(val, columns[i].value, clen);
val[clen] = '\0';
tup[i].len = clen;
tup[i].value = val;
}
}
/* And add the tuple to the PGresult's tuple array */
if (!pqAddTuple(res, tup))
return -1;
/* Success */
return 1;
}
/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
@ -1223,7 +1343,6 @@ PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
conn->curTuple = NULL;
/* ready to send command message */
return true;
@ -1468,6 +1587,9 @@ PQconsumeInput(PGconn *conn)
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
*
* Note: callers of parseInput must be prepared for a longjmp exit when we are
* in PGASYNC_BUSY state, since an external row processor might do that.
*/
static void
parseInput(PGconn *conn)
@ -1615,6 +1737,49 @@ PQgetResult(PGconn *conn)
return res;
}
/*
* PQskipResult
* Get the next PGresult produced by a query, but discard any data rows.
*
* This is mainly useful for cleaning up after a longjmp out of a row
* processor, when resuming processing of the current query result isn't
* wanted. Note that this is of little value in an async-style application,
* since any preceding calls to PQisBusy would have already called the regular
* row processor.
*/
PGresult *
PQskipResult(PGconn *conn)
{
PGresult *res;
PQrowProcessor savedRowProcessor;
if (!conn)
return NULL;
/* temporarily install dummy row processor */
savedRowProcessor = conn->rowProcessor;
conn->rowProcessor = dummyRowProcessor;
/* no need to save/change rowProcessorParam */
/* fetch the next result */
res = PQgetResult(conn);
/* restore previous row processor */
conn->rowProcessor = savedRowProcessor;
return res;
}
/*
* Do-nothing row processor for PQskipResult
*/
static int
dummyRowProcessor(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param)
{
return 1;
}
/*
* PQexec
@ -1721,7 +1886,7 @@ PQexecStart(PGconn *conn)
* Silently discard any prior query result that application didn't eat.
* This is probably poor design, but it's here for backward compatibility.
*/
while ((result = PQgetResult(conn)) != NULL)
while ((result = PQskipResult(conn)) != NULL)
{
ExecStatusType resultStatus = result->resultStatus;

View File

@ -40,9 +40,7 @@
#define LO_BUFSIZE 8192
static int lo_initialize(PGconn *conn);
static Oid
lo_import_internal(PGconn *conn, const char *filename, const Oid oid);
static Oid lo_import_internal(PGconn *conn, const char *filename, Oid oid);
/*
* lo_open
@ -59,7 +57,7 @@ lo_open(PGconn *conn, Oid lobjId, int mode)
PQArgBlock argv[2];
PGresult *res;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -101,7 +99,7 @@ lo_close(PGconn *conn, int fd)
int retval;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -139,7 +137,7 @@ lo_truncate(PGconn *conn, int fd, size_t len)
int retval;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -192,7 +190,7 @@ lo_read(PGconn *conn, int fd, char *buf, size_t len)
PGresult *res;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -234,7 +232,7 @@ lo_write(PGconn *conn, int fd, const char *buf, size_t len)
int result_len;
int retval;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -280,7 +278,7 @@ lo_lseek(PGconn *conn, int fd, int offset, int whence)
int retval;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -328,7 +326,7 @@ lo_creat(PGconn *conn, int mode)
int retval;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return InvalidOid;
@ -367,7 +365,7 @@ lo_create(PGconn *conn, Oid lobjId)
int retval;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return InvalidOid;
@ -413,7 +411,7 @@ lo_tell(PGconn *conn, int fd)
PGresult *res;
int result_len;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -451,7 +449,7 @@ lo_unlink(PGconn *conn, Oid lobjId)
int result_len;
int retval;
if (conn->lobjfuncs == NULL)
if (conn == NULL || conn->lobjfuncs == NULL)
{
if (lo_initialize(conn) < 0)
return -1;
@ -505,7 +503,7 @@ lo_import_with_oid(PGconn *conn, const char *filename, Oid lobjId)
}
static Oid
lo_import_internal(PGconn *conn, const char *filename, const Oid oid)
lo_import_internal(PGconn *conn, const char *filename, Oid oid)
{
int fd;
int nbytes,
@ -684,8 +682,13 @@ lo_initialize(PGconn *conn)
int n;
const char *query;
const char *fname;
PQrowProcessor savedRowProcessor;
void *savedRowProcessorParam;
Oid foid;
if (!conn)
return -1;
/*
* Allocate the structure to hold the functions OID's
*/
@ -729,7 +732,16 @@ lo_initialize(PGconn *conn)
"or proname = 'loread' "
"or proname = 'lowrite'";
/* Ensure the standard row processor is used to collect the result */
savedRowProcessor = conn->rowProcessor;
savedRowProcessorParam = conn->rowProcessorParam;
PQsetRowProcessor(conn, NULL, NULL);
res = PQexec(conn, query);
conn->rowProcessor = savedRowProcessor;
conn->rowProcessorParam = savedRowProcessorParam;
if (res == NULL)
{
free(lobjfuncs);

View File

@ -218,6 +218,32 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
return 0;
}
/*
* pqSkipnchar:
* skip over len bytes in input buffer.
*
* Note: this is primarily useful for its debug output, which should
* be exactly the same as for pqGetnchar. We assume the data in question
* will actually be used, but just isn't getting copied anywhere as yet.
*/
int
pqSkipnchar(size_t len, PGconn *conn)
{
if (len > (size_t) (conn->inEnd - conn->inCursor))
return EOF;
if (conn->Pfdebug)
{
fprintf(conn->Pfdebug, "From backend (%lu)> ", (unsigned long) len);
fputnbytes(conn->Pfdebug, conn->inBuffer + conn->inCursor, len);
fprintf(conn->Pfdebug, "\n");
}
conn->inCursor += len;
return 0;
}
/*
* pqPutnchar:
* write exactly len bytes to the current message

View File

@ -49,11 +49,19 @@ static int getNotify(PGconn *conn);
PostgresPollingStatusType
pqSetenvPoll(PGconn *conn)
{
PostgresPollingStatusType result;
PGresult *res;
PQrowProcessor savedRowProcessor;
void *savedRowProcessorParam;
if (conn == NULL || conn->status == CONNECTION_BAD)
return PGRES_POLLING_FAILED;
/* Ensure the standard row processor is used to collect any results */
savedRowProcessor = conn->rowProcessor;
savedRowProcessorParam = conn->rowProcessorParam;
PQsetRowProcessor(conn, NULL, NULL);
/* Check whether there are any data for us */
switch (conn->setenv_state)
{
@ -69,7 +77,10 @@ pqSetenvPoll(PGconn *conn)
if (n < 0)
goto error_return;
if (n == 0)
return PGRES_POLLING_READING;
{
result = PGRES_POLLING_READING;
goto normal_return;
}
break;
}
@ -83,7 +94,8 @@ pqSetenvPoll(PGconn *conn)
/* Should we raise an error if called when not active? */
case SETENV_STATE_IDLE:
return PGRES_POLLING_OK;
result = PGRES_POLLING_OK;
goto normal_return;
default:
printfPQExpBuffer(&conn->errorMessage,
@ -180,7 +192,10 @@ pqSetenvPoll(PGconn *conn)
case SETENV_STATE_CLIENT_ENCODING_WAIT:
{
if (PQisBusy(conn))
return PGRES_POLLING_READING;
{
result = PGRES_POLLING_READING;
goto normal_return;
}
res = PQgetResult(conn);
@ -205,7 +220,10 @@ pqSetenvPoll(PGconn *conn)
case SETENV_STATE_OPTION_WAIT:
{
if (PQisBusy(conn))
return PGRES_POLLING_READING;
{
result = PGRES_POLLING_READING;
goto normal_return;
}
res = PQgetResult(conn);
@ -244,13 +262,17 @@ pqSetenvPoll(PGconn *conn)
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
return PGRES_POLLING_READING;
result = PGRES_POLLING_READING;
goto normal_return;
}
case SETENV_STATE_QUERY1_WAIT:
{
if (PQisBusy(conn))
return PGRES_POLLING_READING;
{
result = PGRES_POLLING_READING;
goto normal_return;
}
res = PQgetResult(conn);
@ -327,13 +349,17 @@ pqSetenvPoll(PGconn *conn)
goto error_return;
conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
return PGRES_POLLING_READING;
result = PGRES_POLLING_READING;
goto normal_return;
}
case SETENV_STATE_QUERY2_WAIT:
{
if (PQisBusy(conn))
return PGRES_POLLING_READING;
{
result = PGRES_POLLING_READING;
goto normal_return;
}
res = PQgetResult(conn);
@ -380,7 +406,8 @@ pqSetenvPoll(PGconn *conn)
{
/* Query finished, so we're done */
conn->setenv_state = SETENV_STATE_IDLE;
return PGRES_POLLING_OK;
result = PGRES_POLLING_OK;
goto normal_return;
}
break;
}
@ -398,7 +425,12 @@ pqSetenvPoll(PGconn *conn)
error_return:
conn->setenv_state = SETENV_STATE_IDLE;
return PGRES_POLLING_FAILED;
result = PGRES_POLLING_FAILED;
normal_return:
conn->rowProcessor = savedRowProcessor;
conn->rowProcessorParam = savedRowProcessorParam;
return result;
}
@ -406,6 +438,9 @@ error_return:
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
*
* Note: callers of parseInput must be prepared for a longjmp exit when we are
* in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput2(PGconn *conn)
@ -549,6 +584,8 @@ pqParseInput2(PGconn *conn)
/* First 'T' in a query sequence */
if (getRowDescriptions(conn))
return;
/* getRowDescriptions() moves inStart itself */
continue;
}
else
{
@ -569,6 +606,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, FALSE))
return;
/* getAnotherTuple() moves inStart itself */
continue;
}
else
{
@ -585,6 +624,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, TRUE))
return;
/* getAnotherTuple() moves inStart itself */
continue;
}
else
{
@ -627,27 +668,32 @@ pqParseInput2(PGconn *conn)
/*
* parseInput subroutine to read a 'T' (row descriptions) message.
* We build a PGresult structure containing the attribute data.
* Returns: 0 if completed message, EOF if not enough data yet.
* Returns: 0 if completed message, EOF if error or not enough data
* received yet.
*
* Note that if we run out of data, we have to release the partially
* constructed PGresult, and rebuild it again next time. Fortunately,
* that shouldn't happen often, since 'T' messages usually fit in a packet.
* Note that if we run out of data, we have to suspend and reprocess
* the message after more data is received. Otherwise, conn->inStart
* must get advanced past the processed data.
*/
static int
getRowDescriptions(PGconn *conn)
{
PGresult *result = NULL;
PGresult *result;
int nfields;
const char *errmsg;
int i;
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
/* parseInput already read the 'T' label. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
goto failure;
goto EOFexit;
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
@ -656,7 +702,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
@ -671,7 +720,7 @@ getRowDescriptions(PGconn *conn)
pqGetInt(&typid, 4, conn) ||
pqGetInt(&typlen, 2, conn) ||
pqGetInt(&atttypmod, 4, conn))
goto failure;
goto EOFexit;
/*
* Since pqGetInt treats 2-byte integers as unsigned, we need to
@ -682,7 +731,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
result->attDescs[i].tableid = 0;
result->attDescs[i].columnid = 0;
result->attDescs[i].format = 0;
@ -693,30 +745,90 @@ getRowDescriptions(PGconn *conn)
/* Success! */
conn->result = result;
return 0;
failure:
if (result)
/*
* Advance inStart to show that the "T" message has been processed. We
* must do this before calling the row processor, in case it longjmps.
*/
conn->inStart = conn->inCursor;
/* Give the row processor a chance to initialize for new result set */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, NULL, &errmsg,
conn->rowProcessorParam))
{
case 1:
/* everything is good */
return 0;
case -1:
/* error, report the errmsg below */
break;
default:
/* unrecognized return code */
errmsg = libpq_gettext("unrecognized return value from row processor");
break;
}
goto set_error_result;
advance_and_error:
/*
* Discard the failed message. Unfortunately we don't know for sure
* where the end is, so just throw away everything in the input buffer.
* This is not very desirable but it's the best we can do in protocol v2.
*/
conn->inStart = conn->inEnd;
set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
/*
* If row processor didn't provide an error message, assume "out of
* memory" was meant. The advantage of having this special case is that
* freeing the old result first greatly improves the odds that gettext()
* will succeed in providing a translation.
*/
if (!errmsg)
errmsg = libpq_gettext("out of memory for query result");
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
* do to recover...
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
conn->asyncStatus = PGASYNC_READY;
EOFexit:
if (result && result != conn->result)
PQclear(result);
return EOF;
}
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
* We add another tuple to the existing PGresult structure.
* Returns: 0 if completed message, EOF if error or not enough data yet.
* We fill rowbuf with column pointers and then call the row processor.
* Returns: 0 if completed message, EOF if error or not enough data
* received yet.
*
* Note that if we run out of data, we have to suspend and reprocess
* the message after more data is received. We keep a partially constructed
* tuple in conn->curTuple, and avoid reallocating already-allocated storage.
* the message after more data is received. Otherwise, conn->inStart
* must get advanced past the processed data.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
PGresAttValue *tup;
const char *errmsg;
PGdataValue *rowbuf;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
char *bitmap = std_bitmap;
@ -727,28 +839,33 @@ getAnotherTuple(PGconn *conn, bool binary)
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
/* Resize row buffer if needed */
rowbuf = conn->rowBuf;
if (nfields > conn->rowBufLen)
{
rowbuf = (PGdataValue *) realloc(rowbuf,
nfields * sizeof(PGdataValue));
if (!rowbuf)
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
conn->rowBuf = rowbuf;
conn->rowBufLen = nfields;
}
/* Save format specifier */
result->binary = binary;
/* Allocate tuple space if first time for this data message */
if (conn->curTuple == NULL)
/*
* If it's binary, fix the column format indicators. We assume the
* backend will consistently send either B or D, not a mix.
*/
if (binary)
{
conn->curTuple = (PGresAttValue *)
pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
if (conn->curTuple == NULL)
goto outOfMemory;
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
/*
* If it's binary, fix the column format indicators. We assume the
* backend will consistently send either B or D, not a mix.
*/
if (binary)
{
for (i = 0; i < nfields; i++)
result->attDescs[i].format = 1;
}
for (i = 0; i < nfields; i++)
result->attDescs[i].format = 1;
}
tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@ -757,7 +874,10 @@ getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
goto outOfMemory;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
}
if (pqGetnchar(bitmap, nbytes, conn))
@ -770,35 +890,34 @@ getAnotherTuple(PGconn *conn, bool binary)
for (i = 0; i < nfields; i++)
{
/* get the value length */
if (!(bmap & 0200))
{
/* if the field value is absent, make it a null string */
tup[i].value = result->null_field;
tup[i].len = NULL_LEN;
}
vlen = NULL_LEN;
else if (pqGetInt(&vlen, 4, conn))
goto EOFexit;
else
{
/* get the value length (the first four bytes are for length) */
if (pqGetInt(&vlen, 4, conn))
goto EOFexit;
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
if (tup[i].value == NULL)
{
tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
if (tup[i].value == NULL)
goto outOfMemory;
}
tup[i].len = vlen;
/* read in the value */
if (vlen > 0)
if (pqGetnchar((char *) (tup[i].value), vlen, conn))
goto EOFexit;
/* we have to terminate this ourselves */
tup[i].value[vlen] = '\0';
}
rowbuf[i].len = vlen;
/*
* rowbuf[i].value always points to the next address in the data
* buffer even if the value is NULL. This allows row processors to
* estimate data sizes more easily.
*/
rowbuf[i].value = conn->inBuffer + conn->inCursor;
/* Skip over the data value */
if (vlen > 0)
{
if (pqSkipnchar(vlen, conn))
goto EOFexit;
}
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
@ -811,26 +930,63 @@ getAnotherTuple(PGconn *conn, bool binary)
bmap <<= 1;
}
/* Success! Store the completed tuple in the result */
if (!pqAddTuple(result, tup))
goto outOfMemory;
/* and reset for a new message */
conn->curTuple = NULL;
/* Release bitmap now if we allocated it */
if (bitmap != std_bitmap)
free(bitmap);
return 0;
outOfMemory:
/* Replace partially constructed result with an error result */
bitmap = NULL;
/*
* we do NOT use pqSaveErrorResult() here, because of the likelihood that
* there's not enough memory to concatenate messages...
* Advance inStart to show that the "D" message has been processed. We
* must do this before calling the row processor, in case it longjmps.
*/
conn->inStart = conn->inCursor;
/* Pass the completed row values to rowProcessor */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
conn->rowProcessorParam))
{
case 1:
/* everything is good */
return 0;
case -1:
/* error, report the errmsg below */
break;
default:
/* unrecognized return code */
errmsg = libpq_gettext("unrecognized return value from row processor");
break;
}
goto set_error_result;
advance_and_error:
/*
* Discard the failed message. Unfortunately we don't know for sure
* where the end is, so just throw away everything in the input buffer.
* This is not very desirable but it's the best we can do in protocol v2.
*/
conn->inStart = conn->inEnd;
set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("out of memory for query result\n"));
/*
* If row processor didn't provide an error message, assume "out of
* memory" was meant. The advantage of having this special case is that
* freeing the old result first greatly improves the odds that gettext()
* will succeed in providing a translation.
*/
if (!errmsg)
errmsg = libpq_gettext("out of memory for query result");
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
@ -838,8 +994,6 @@ outOfMemory:
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
conn->asyncStatus = PGASYNC_READY;
/* Discard the failed message --- good idea? */
conn->inStart = conn->inEnd;
EOFexit:
if (bitmap != NULL && bitmap != std_bitmap)
@ -1122,7 +1276,8 @@ pqGetline2(PGconn *conn, char *s, int maxlen)
{
int result = 1; /* return value if buffer overflows */
if (conn->sock < 0)
if (conn->sock < 0 ||
conn->asyncStatus != PGASYNC_COPY_OUT)
{
*s = '\0';
return EOF;

View File

@ -44,7 +44,7 @@
static void handleSyncLoss(PGconn *conn, char id, int msgLength);
static int getRowDescriptions(PGconn *conn);
static int getRowDescriptions(PGconn *conn, int msgLength);
static int getParamDescriptions(PGconn *conn);
static int getAnotherTuple(PGconn *conn, int msgLength);
static int getParameterStatus(PGconn *conn);
@ -61,6 +61,9 @@ static int build_startup_packet(const PGconn *conn, char *packet,
* parseInput: if appropriate, parse input data from backend
* until input is exhausted or a stopping state is reached.
* Note that this function will NOT attempt to read more data from the backend.
*
* Note: callers of parseInput must be prepared for a longjmp exit when we are
* in PGASYNC_BUSY state, since an external row processor might do that.
*/
void
pqParseInput3(PGconn *conn)
@ -269,15 +272,10 @@ pqParseInput3(PGconn *conn)
conn->queryclass == PGQUERY_DESCRIBE)
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn))
if (getRowDescriptions(conn, msgLength))
return;
/*
* If we're doing a Describe, we're ready to pass the
* result back to the client.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
conn->asyncStatus = PGASYNC_READY;
/* getRowDescriptions() moves inStart itself */
continue;
}
else
{
@ -327,6 +325,8 @@ pqParseInput3(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength))
return;
/* getAnotherTuple() moves inStart itself */
continue;
}
else if (conn->result != NULL &&
conn->result->resultStatus == PGRES_FATAL_ERROR)
@ -443,17 +443,20 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
* parseInput subroutine to read a 'T' (row descriptions) message.
* We'll build a new PGresult structure (unless called for a Describe
* command for a prepared statement) containing the attribute data.
* Returns: 0 if completed message, EOF if not enough data yet.
* Returns: 0 if processed message successfully, EOF to suspend parsing
* (the latter case is not actually used currently).
* In either case, conn->inStart has been advanced past the message.
*
* Note that if we run out of data, we have to release the partially
* constructed PGresult, and rebuild it again next time. Fortunately,
* that shouldn't happen often, since 'T' messages usually fit in a packet.
* Note: the row processor could also choose to longjmp out of libpq,
* in which case the library's state must allow for resumption at the
* next message.
*/
static int
getRowDescriptions(PGconn *conn)
getRowDescriptions(PGconn *conn, int msgLength)
{
PGresult *result;
int nfields;
const char *errmsg;
int i;
/*
@ -471,12 +474,19 @@ getRowDescriptions(PGconn *conn)
else
result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
if (!result)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
/* parseInput already read the 'T' label and message length. */
/* the next two bytes are the number of fields */
if (pqGetInt(&(result->numAttributes), 2, conn))
goto failure;
{
/* We should not run out of data here, so complain */
errmsg = libpq_gettext("insufficient data in \"T\" message");
goto advance_and_error;
}
nfields = result->numAttributes;
/* allocate space for the attribute descriptors */
@ -485,7 +495,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs = (PGresAttDesc *)
pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
if (!result->attDescs)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
}
@ -510,7 +523,9 @@ getRowDescriptions(PGconn *conn)
pqGetInt(&atttypmod, 4, conn) ||
pqGetInt(&format, 2, conn))
{
goto failure;
/* We should not run out of data here, so complain */
errmsg = libpq_gettext("insufficient data in \"T\" message");
goto advance_and_error;
}
/*
@ -524,7 +539,10 @@ getRowDescriptions(PGconn *conn)
result->attDescs[i].name = pqResultStrdup(result,
conn->workBuffer.data);
if (!result->attDescs[i].name)
goto failure;
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
result->attDescs[i].tableid = tableid;
result->attDescs[i].columnid = columnid;
result->attDescs[i].format = format;
@ -536,24 +554,84 @@ getRowDescriptions(PGconn *conn)
result->binary = 0;
}
/* Sanity check that we absorbed all the data */
if (conn->inCursor != conn->inStart + 5 + msgLength)
{
errmsg = libpq_gettext("extraneous data in \"T\" message");
goto advance_and_error;
}
/* Success! */
conn->result = result;
return 0;
failure:
/*
* Discard incomplete result, unless it's from getParamDescriptions.
*
* Note that if we hit a bufferload boundary while handling the
* describe-statement case, we'll forget any PGresult space we just
* allocated, and then reallocate it on next try. This will bloat the
* PGresult a little bit but the space will be freed at PQclear, so it
* doesn't seem worth trying to be smarter.
* Advance inStart to show that the "T" message has been processed. We
* must do this before calling the row processor, in case it longjmps.
*/
if (result != conn->result)
conn->inStart = conn->inCursor;
/*
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
if (conn->queryclass == PGQUERY_DESCRIBE)
{
conn->asyncStatus = PGASYNC_READY;
return 0;
}
/* Give the row processor a chance to initialize for new result set */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, NULL, &errmsg,
conn->rowProcessorParam))
{
case 1:
/* everything is good */
return 0;
case -1:
/* error, report the errmsg below */
break;
default:
/* unrecognized return code */
errmsg = libpq_gettext("unrecognized return value from row processor");
break;
}
goto set_error_result;
advance_and_error:
/* Discard unsaved result, if any */
if (result && result != conn->result)
PQclear(result);
return EOF;
/* Discard the failed message by pretending we read it */
conn->inStart += 5 + msgLength;
set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
/*
* If row processor didn't provide an error message, assume "out of
* memory" was meant.
*/
if (!errmsg)
errmsg = libpq_gettext("out of memory for query result");
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
pqSaveErrorResult(conn);
/*
* Return zero to allow input parsing to continue. Subsequent "D"
* messages will be ignored until we get to end of data, since an error
* result is already set up.
*/
return 0;
}
/*
@ -613,47 +691,53 @@ failure:
/*
* parseInput subroutine to read a 'D' (row data) message.
* We add another tuple to the existing PGresult structure.
* Returns: 0 if completed message, EOF if error or not enough data yet.
* We fill rowbuf with column pointers and then call the row processor.
* Returns: 0 if processed message successfully, EOF to suspend parsing
* (the latter case is not actually used currently).
* In either case, conn->inStart has been advanced past the message.
*
* Note that if we run out of data, we have to suspend and reprocess
* the message after more data is received. We keep a partially constructed
* tuple in conn->curTuple, and avoid reallocating already-allocated storage.
* Note: the row processor could also choose to longjmp out of libpq,
* in which case the library's state must allow for resumption at the
* next message.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
PGresAttValue *tup;
const char *errmsg;
PGdataValue *rowbuf;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
/* Allocate tuple space if first time for this data message */
if (conn->curTuple == NULL)
{
conn->curTuple = (PGresAttValue *)
pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
if (conn->curTuple == NULL)
goto outOfMemory;
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
}
tup = conn->curTuple;
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
return EOF;
{
/* We should not run out of data here, so complain */
errmsg = libpq_gettext("insufficient data in \"D\" message");
goto advance_and_error;
}
if (tupnfields != nfields)
{
/* Replace partially constructed result with an error result */
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("unexpected field count in \"D\" message\n"));
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
conn->inCursor = conn->inStart + 5 + msgLength;
return 0;
errmsg = libpq_gettext("unexpected field count in \"D\" message");
goto advance_and_error;
}
/* Resize row buffer if needed */
rowbuf = conn->rowBuf;
if (nfields > conn->rowBufLen)
{
rowbuf = (PGdataValue *) realloc(rowbuf,
nfields * sizeof(PGdataValue));
if (!rowbuf)
{
errmsg = NULL; /* means "out of memory", see below */
goto advance_and_error;
}
conn->rowBuf = rowbuf;
conn->rowBufLen = nfields;
}
/* Scan the fields */
@ -661,54 +745,94 @@ getAnotherTuple(PGconn *conn, int msgLength)
{
/* get the value length */
if (pqGetInt(&vlen, 4, conn))
return EOF;
if (vlen == -1)
{
/* null field */
tup[i].value = result->null_field;
tup[i].len = NULL_LEN;
continue;
/* We should not run out of data here, so complain */
errmsg = libpq_gettext("insufficient data in \"D\" message");
goto advance_and_error;
}
if (vlen < 0)
vlen = 0;
if (tup[i].value == NULL)
{
bool isbinary = (result->attDescs[i].format != 0);
rowbuf[i].len = vlen;
tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
if (tup[i].value == NULL)
goto outOfMemory;
}
tup[i].len = vlen;
/* read in the value */
/*
* rowbuf[i].value always points to the next address in the data
* buffer even if the value is NULL. This allows row processors to
* estimate data sizes more easily.
*/
rowbuf[i].value = conn->inBuffer + conn->inCursor;
/* Skip over the data value */
if (vlen > 0)
if (pqGetnchar((char *) (tup[i].value), vlen, conn))
return EOF;
/* we have to terminate this ourselves */
tup[i].value[vlen] = '\0';
{
if (pqSkipnchar(vlen, conn))
{
/* We should not run out of data here, so complain */
errmsg = libpq_gettext("insufficient data in \"D\" message");
goto advance_and_error;
}
}
}
/* Success! Store the completed tuple in the result */
if (!pqAddTuple(result, tup))
goto outOfMemory;
/* and reset for a new message */
conn->curTuple = NULL;
/* Sanity check that we absorbed all the data */
if (conn->inCursor != conn->inStart + 5 + msgLength)
{
errmsg = libpq_gettext("extraneous data in \"D\" message");
goto advance_and_error;
}
return 0;
/*
* Advance inStart to show that the "D" message has been processed. We
* must do this before calling the row processor, in case it longjmps.
*/
conn->inStart = conn->inCursor;
outOfMemory:
/* Pass the completed row values to rowProcessor */
errmsg = NULL;
switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
conn->rowProcessorParam))
{
case 1:
/* everything is good */
return 0;
case -1:
/* error, report the errmsg below */
break;
default:
/* unrecognized return code */
errmsg = libpq_gettext("unrecognized return value from row processor");
break;
}
goto set_error_result;
advance_and_error:
/* Discard the failed message by pretending we read it */
conn->inStart += 5 + msgLength;
set_error_result:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("out of memory for query result\n"));
/*
* If row processor didn't provide an error message, assume "out of
* memory" was meant. The advantage of having this special case is that
* freeing the old result first greatly improves the odds that gettext()
* will succeed in providing a translation.
*/
if (!errmsg)
errmsg = libpq_gettext("out of memory for query result");
printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
conn->inCursor = conn->inStart + 5 + msgLength;
/*
* Return zero to allow input parsing to continue. Subsequent "D"
* messages will be ignored until we get to end of data, since an error
* result is already set up.
*/
return 0;
}

View File

@ -38,13 +38,14 @@ extern "C"
/* Application-visible enum types */
/*
* Although it is okay to add to these lists, values which become unused
* should never be removed, nor should constants be redefined - that would
* break compatibility with existing code.
*/
typedef enum
{
/*
* Although it is okay to add to this list, values which become unused
* should never be removed, nor should constants be redefined - that would
* break compatibility with existing code.
*/
CONNECTION_OK,
CONNECTION_BAD,
/* Non-blocking mode only below here */
@ -128,6 +129,17 @@ typedef struct pg_conn PGconn;
*/
typedef struct pg_result PGresult;
/* PGdataValue represents a data field value being passed to a row processor.
* It could be either text or binary data; text data is not zero-terminated.
* A SQL NULL is represented by len < 0; then value is still valid but there
* are no data bytes there.
*/
typedef struct pgDataValue
{
int len; /* data length in bytes, or <0 if NULL */
const char *value; /* data value, without zero-termination */
} PGdataValue;
/* PGcancel encapsulates the information needed to cancel a running
* query on an existing connection.
* The contents of this struct are not supposed to be known to applications.
@ -149,6 +161,10 @@ typedef struct pgNotify
struct pgNotify *next; /* list link */
} PGnotify;
/* Function type for row-processor callback */
typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@ -388,11 +404,16 @@ extern int PQsendQueryPrepared(PGconn *conn,
const int *paramFormats,
int resultFormat);
extern PGresult *PQgetResult(PGconn *conn);
extern PGresult *PQskipResult(PGconn *conn);
/* Routines for managing an asynchronous query */
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
/* Override default per-row processing */
extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);

View File

@ -324,6 +324,10 @@ struct pg_conn
/* Optional file to write trace info to */
FILE *Pfdebug;
/* Callback procedure for per-row processing */
PQrowProcessor rowProcessor; /* function pointer */
void *rowProcessorParam; /* passthrough argument */
/* Callback procedures for notice message processing */
PGNoticeHooks noticeHooks;
@ -396,9 +400,14 @@ struct pg_conn
* msg has no length word */
int outMsgEnd; /* offset to msg end (so far) */
/* Row processor interface workspace */
PGdataValue *rowBuf; /* array for passing values to rowProcessor */
int rowBufLen; /* number of entries allocated in rowBuf */
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
PGresAttValue *curTuple; /* tuple currently being read */
/* Assorted state for SSL, GSS, etc */
#ifdef USE_SSL
bool allow_ssl_try; /* Allowed to try SSL negotiation */
@ -435,7 +444,6 @@ struct pg_conn
* connection */
#endif
/* Buffer for current error message */
PQExpBufferData errorMessage; /* expansible string */
@ -505,7 +513,6 @@ extern void
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/* This lets gcc check the format string for consistency. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
extern int pqAddTuple(PGresult *res, PGresAttValue *tup);
extern void pqSaveMessageField(PGresult *res, char code,
const char *value);
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
@ -558,6 +565,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
extern int pqPuts(const char *s, PGconn *conn);
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
extern int pqSkipnchar(size_t len, PGconn *conn);
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
extern int pqPutInt(int value, size_t bytes, PGconn *conn);