Improve efficiency of dblink by using libpq's new row processor API.

This patch provides a test case for libpq's row processor API.
contrib/dblink can deal with very large result sets by dumping them into
a tuplestore (which can spill to disk) --- but until now, the intermediate
storage of the query result in a PGresult meant memory bloat for any large
result.  Now we use a row processor to convert the data to tuple form and
dump it directly into the tuplestore.

A limitation is that this only works for plain dblink() queries, not
dblink_send_query() followed by dblink_get_result().  In the latter
case we don't know the desired tuple rowtype soon enough.  While hack
solutions to that are possible, a different user-level API would
probably be a better answer.

Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
This commit is contained in:
Tom Lane 2012-04-04 18:39:08 -04:00
parent 92785dac2e
commit 6f922ef88e
2 changed files with 374 additions and 83 deletions

View File

@ -63,12 +63,28 @@ typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */ bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn; } remoteConn;
typedef struct storeInfo
{
FunctionCallInfo fcinfo;
Tuplestorestate *tuplestore;
AttInMetadata *attinmeta;
MemoryContext tmpcontext;
char **cstrs;
} storeInfo;
/* /*
* Internal declarations * Internal declarations
*/ */
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
static void prepTuplestoreResult(FunctionCallInfo fcinfo); static void prepTuplestoreResult(FunctionCallInfo fcinfo);
static void materializeResult(FunctionCallInfo fcinfo, PGresult *res); static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static void materializeQueryResult(FunctionCallInfo fcinfo,
PGconn *conn,
const char *conname,
const char *sql,
bool fail);
static int storeHandler(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param);
static remoteConn *getConnectionByName(const char *name); static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void); static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn); static void createNewConnection(const char *name, remoteConn *rconn);
@ -629,100 +645,118 @@ dblink_get_result(PG_FUNCTION_ARGS)
static Datum static Datum
dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
{ {
char *msg; PGconn *volatile conn = NULL;
PGresult *res = NULL; volatile bool freeconn = false;
PGconn *conn = NULL;
char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
prepTuplestoreResult(fcinfo); prepTuplestoreResult(fcinfo);
DBLINK_INIT; DBLINK_INIT;
if (!is_async) PG_TRY();
{ {
if (PG_NARGS() == 3) char *msg;
char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
if (!is_async)
{ {
/* text,text,bool */ if (PG_NARGS() == 3)
DBLINK_GET_CONN;
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 2)
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
}
else
{ {
/* text,text,bool */
DBLINK_GET_CONN; DBLINK_GET_CONN;
sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 2)
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
}
else
{
DBLINK_GET_CONN;
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
}
else if (PG_NARGS() == 1)
{
/* text */
conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
else /* is_async */
{
/* get async result */
if (PG_NARGS() == 2)
{
/* text,bool */
DBLINK_GET_NAMED_CONN;
fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
/* text */
DBLINK_GET_NAMED_CONN;
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
if (!conn)
DBLINK_CONN_NOT_AVAIL;
if (!is_async)
{
/* synchronous query, use efficient tuple collection method */
materializeQueryResult(fcinfo, conn, conname, sql, fail);
}
else
{
/* async result retrieval, do it the old way */
PGresult *res = PQgetResult(conn);
/* NULL means we're all done with the async results */
if (res)
{
if (PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK)
{
dblink_res_error(conname, res, "could not execute query",
fail);
/* if fail isn't set, we'll return an empty query result */
}
else
{
materializeResult(fcinfo, res);
}
} }
} }
else if (PG_NARGS() == 1)
{
/* text */
conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
} }
else /* is_async */ PG_CATCH();
{ {
/* get async result */ /* if needed, close the connection to the database */
if (PG_NARGS() == 2) if (freeconn)
{ PQfinish(conn);
/* text,bool */ PG_RE_THROW();
DBLINK_GET_NAMED_CONN;
fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
/* text */
DBLINK_GET_NAMED_CONN;
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
} }
PG_END_TRY();
if (!conn) /* if needed, close the connection to the database */
DBLINK_CONN_NOT_AVAIL;
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
{
res = PQgetResult(conn);
/* NULL means we're all done with the async results */
if (!res)
return (Datum) 0;
}
/* if needed, close the connection to the database and cleanup */
if (freeconn) if (freeconn)
PQfinish(conn); PQfinish(conn);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
dblink_res_error(conname, res, "could not execute query", fail);
return (Datum) 0;
}
materializeResult(fcinfo, res);
return (Datum) 0; return (Datum) 0;
} }
@ -890,6 +924,259 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
PG_END_TRY(); PG_END_TRY();
} }
/*
* Execute the given SQL command and store its results into a tuplestore
* to be returned as the result of the current function.
* This is equivalent to PQexec followed by materializeResult, but we make
* use of libpq's "row processor" API to reduce per-row overhead.
*/
static void
materializeQueryResult(FunctionCallInfo fcinfo,
PGconn *conn,
const char *conname,
const char *sql,
bool fail)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
PGresult *volatile res = NULL;
storeInfo sinfo;
/* prepTuplestoreResult must have been called previously */
Assert(rsinfo->returnMode == SFRM_Materialize);
PG_TRY();
{
/* initialize storeInfo to empty */
memset(&sinfo, 0, sizeof(sinfo));
sinfo.fcinfo = fcinfo;
/* We'll collect tuples using storeHandler */
PQsetRowProcessor(conn, storeHandler, &sinfo);
res = PQexec(conn, sql);
/* We don't keep the custom row processor installed permanently */
PQsetRowProcessor(conn, NULL, NULL);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
/*
* dblink_res_error will clear the passed PGresult, so we need
* this ugly dance to avoid doing so twice during error exit
*/
PGresult *res1 = res;
res = NULL;
dblink_res_error(conname, res1, "could not execute query", fail);
/* if fail isn't set, we'll return an empty query result */
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/*
* storeHandler didn't get called, so we need to convert the
* command status string to a tuple manually
*/
TupleDesc tupdesc;
AttInMetadata *attinmeta;
Tuplestorestate *tupstore;
HeapTuple tuple;
char *values[1];
MemoryContext oldcontext;
/*
* need a tuple descriptor representing one TEXT column to return
* the command status string as our result tuple
*/
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
attinmeta = TupleDescGetAttInMetadata(tupdesc);
oldcontext = MemoryContextSwitchTo(
rsinfo->econtext->ecxt_per_query_memory);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
values[0] = PQcmdStatus(res);
/* build the tuple and put it into the tuplestore. */
tuple = BuildTupleFromCStrings(attinmeta, values);
tuplestore_puttuple(tupstore, tuple);
PQclear(res);
}
else
{
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
/* storeHandler should have created a tuplestore */
Assert(rsinfo->setResult != NULL);
PQclear(res);
}
}
PG_CATCH();
{
/* be sure to unset the custom row processor */
PQsetRowProcessor(conn, NULL, NULL);
/* be sure to release any libpq result we collected */
if (res)
PQclear(res);
/* and clear out any pending data in libpq */
while ((res = PQskipResult(conn)) != NULL)
PQclear(res);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* Custom row processor for materializeQueryResult.
* Prototype of this function must match PQrowProcessor.
*/
static int
storeHandler(PGresult *res, const PGdataValue *columns,
const char **errmsgp, void *param)
{
storeInfo *sinfo = (storeInfo *) param;
int nfields = PQnfields(res);
char **cstrs = sinfo->cstrs;
HeapTuple tuple;
char *pbuf;
int pbuflen;
int i;
MemoryContext oldcontext;
if (columns == NULL)
{
/* Prepare for new result set */
ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
TupleDesc tupdesc;
/*
* It's possible to get more than one result set if the query string
* contained multiple SQL commands. In that case, we follow PQexec's
* traditional behavior of throwing away all but the last result.
*/
if (sinfo->tuplestore)
tuplestore_end(sinfo->tuplestore);
sinfo->tuplestore = NULL;
/* get a tuple descriptor for our result type */
switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* success */
break;
case TYPEFUNC_RECORD:
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
/* make sure we have a persistent copy of the tupdesc */
tupdesc = CreateTupleDescCopy(tupdesc);
/* check result and tuple descriptor have the same number of columns */
if (nfields != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
/* Prepare attinmeta for later data conversions */
sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* Create a new, empty tuplestore */
oldcontext = MemoryContextSwitchTo(
rsinfo->econtext->ecxt_per_query_memory);
sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->setResult = sinfo->tuplestore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/*
* Set up sufficiently-wide string pointers array; this won't change
* in size so it's easy to preallocate.
*/
if (sinfo->cstrs)
pfree(sinfo->cstrs);
sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
/* Create short-lived memory context for data conversions */
if (!sinfo->tmpcontext)
sinfo->tmpcontext =
AllocSetContextCreate(CurrentMemoryContext,
"dblink temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
return 1;
}
CHECK_FOR_INTERRUPTS();
/*
* Do the following work in a temp context that we reset after each tuple.
* This cleans up not only the data we have direct access to, but any
* cruft the I/O functions might leak.
*/
oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
/*
* The strings passed to us are not null-terminated, but the datatype
* input functions we're about to call require null termination. Copy the
* strings and add null termination. As a micro-optimization, allocate
* all the strings with one palloc.
*/
pbuflen = nfields; /* count the null terminators themselves */
for (i = 0; i < nfields; i++)
{
int len = columns[i].len;
if (len > 0)
pbuflen += len;
}
pbuf = (char *) palloc(pbuflen);
for (i = 0; i < nfields; i++)
{
int len = columns[i].len;
if (len < 0)
cstrs[i] = NULL;
else
{
cstrs[i] = pbuf;
memcpy(pbuf, columns[i].value, len);
pbuf += len;
*pbuf++ = '\0';
}
}
/* Convert row to a tuple, and add it to the tuplestore */
tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
tuplestore_puttuple(sinfo->tuplestore, tuple);
/* Clean up */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(sinfo->tmpcontext);
return 1;
}
/* /*
* List all open dblink connections by name. * List all open dblink connections by name.
* Returns an array of all connection names. * Returns an array of all connection names.

View File

@ -425,14 +425,6 @@ SELECT *
<refsect1> <refsect1>
<title>Notes</title> <title>Notes</title>
<para>
<function>dblink</> fetches the entire remote query result before
returning any of it to the local system. If the query is expected
to return a large number of rows, it's better to open it as a cursor
with <function>dblink_open</> and then fetch a manageable number
of rows at a time.
</para>
<para> <para>
A convenient way to use <function>dblink</> with predetermined A convenient way to use <function>dblink</> with predetermined
queries is to create a view. queries is to create a view.
@ -1432,6 +1424,18 @@ dblink_get_result(text connname [, bool fail_on_error]) returns setof record
sent, and one additional time to obtain an empty set result, sent, and one additional time to obtain an empty set result,
before the connection can be used again. before the connection can be used again.
</para> </para>
<para>
When using <function>dblink_send_query</> and
<function>dblink_get_result</>, <application>dblink</> fetches the entire
remote query result before returning any of it to the local query
processor. If the query returns a large number of rows, this can result
in transient memory bloat in the local session. It may be better to open
such a query as a cursor with <function>dblink_open</> and then fetch a
manageable number of rows at a time. Alternatively, use plain
<function>dblink()</>, which avoids memory bloat by spooling large result
sets to disk.
</para>
</refsect1> </refsect1>
<refsect1> <refsect1>