Allow remote query execution (dblink)

Joe Conway
This commit is contained in:
Bruce Momjian 2001-06-14 16:49:03 +00:00
parent b33c66234e
commit 5af4855383
6 changed files with 560 additions and 0 deletions

View File

@ -38,6 +38,10 @@ dbase -
Converts from dbase/xbase to PostgreSQL
by Ivan Baldo, lubaldo@adinet.com.uy
dblink -
Allows remote query execution
by Joe Conway, joe.conway@mail.com
earthdistance -
Operator for computing earth distance for two points
by Hal Snyder <hal@vailsys.com>

48
contrib/dblink/Makefile Normal file
View File

@ -0,0 +1,48 @@
subdir = contrib/dblink
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_builddir)/src/Makefile.shlib
include_srcdir = $(top_builddir)/src/include
NAME := dblink
SONAME := $(NAME)$(DLSUFFIX)
override CFLAGS += -I$(srcdir)
override CFLAGS += -I$(include_srcdir)
override CFLAGS += -I$(libpq_srcdir)
override CFLAGS += $(CFLAGS_SL)
OBJS = $(NAME).o
all: $(OBJS) $(SONAME) $(NAME).sql
$(OBJS): $(NAME).c
$(CC) -o $@ -c $(CFLAGS) $<
$(SONAME): $(OBJS)
$(LD) -o $@ -Bshareable $(libpq) $<
$(NAME).sql: $(NAME).sql.in
sed -e 's:MODULE_PATHNAME:$(libdir)/contrib/$(SONAME):g' < $< > $@
install: all installdirs
$(INSTALL_DATA) README.$(NAME) $(docdir)/contrib
$(INSTALL_DATA) $(NAME).sql $(datadir)/contrib
$(INSTALL_SHLIB) $(SONAME) $(libdir)/contrib
installdirs:
$(mkinstalldirs) $(docdir)/contrib $(datadir)/contrib $(libdir)/contrib
uninstall:
rm -rf $(docdir)/contrib/README.$(NAME) $(datadir)/contrib/$(NAME).sql $(libdir)/contrib/$(SONAME)
clean distclean maintainer-clean:
rm -f $(SONAME) *.o *.sql
depend dep:
$(CC) -MM -MG $(CFLAGS) *.c > depend
ifeq (depend,$(wildcard depend))
include depend
endif

View File

@ -0,0 +1,157 @@
/*
* dblink
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
Version 0.2 (29 May, 2001):
Function to test returning data set from remote database
Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
Release Notes:
Version 0.2
- initial release
Installation:
Place these files in a directory called 'dblink' under 'contrib' in the PostgreSQL source tree. Then run:
make
make install
You can use dblink.sql to create the functions in your database of choice, e.g.
psql -U postgres template1 < dblink.sql
installs following functions into database template1:
dblink() - returns a pointer to results from remote query
dblink_tok() - extracts and returns individual field results
Documentation
==================================================================
Name
dblink -- Returns a pointer to a data set from a remote database
Synopsis
dblink(text connstr, text sql)
Inputs
connstr
standard libpq format connection srting,
e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd"
sql
sql statement that you wish to execute on the remote host
e.g. "select * from pg_class"
Outputs
Returns setof int (pointer)
Example usage
select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd','select f1, f2 from mytable');
==================================================================
Name
dblink_tok -- Returns individual select field results from a dblink remote query
Synopsis
dblink_tok(int pointer, int fnumber)
Inputs
pointer
a pointer returned by a call to dblink()
fnumber
the ordinal position (zero based) of the field to be returned from the dblink result set
Outputs
Returns text
Example usage
select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2
from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'select f1, f2 from mytable') as dblink_p) as t1;
==================================================================
NOTE: If you need to provide selection criteria in a WHERE clause, it is necessary
to 'fake' a UNION, e.g.
select
dblink_tok(t1.dblink_p,0) as f1
,dblink_tok(t1.dblink_p,1) as f2
from
(
select dblink(
'hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'select f1, f2 from mytable'
) as dblink_p
union
select null,null where false
) as t1
where
f1 = 'mycriteria';
in order to work around an issue with the query optimizer. A more convenient way to approach
this problem is to create a view:
create view myremotetable as
select
dblink_tok(t1.dblink_p,0) as f1
,dblink_tok(t1.dblink_p,1) as f2
from
(
select dblink(
'hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
,'select f1, f2 from mytable'
) as dblink_p
union
select null,null where false
) as t1;
Then you can simply write:
select f1,f2 from myremotetable where f1 = 'mycriteria';
==================================================================
-- Joe Conway

276
contrib/dblink/dblink.c Normal file
View File

@ -0,0 +1,276 @@
/*
* dblink.c
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#include "dblink.h"
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *curstr = "DECLARE mycursor CURSOR FOR ";
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
elog(ERROR, "dblink: NULL arguments are not permitted");
}
if (fcinfo->resultinfo == NULL || ! IsA(fcinfo->resultinfo, ReturnSetInfo)) {
elog(ERROR, "dblink: function called in context that does not accept a set result");
}
optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
if (fcinfo->flinfo->fn_extra == NULL) {
conn = PQconnectdb(optstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = PQerrorMessage(conn);
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
msg = PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: begin error: %s", msg);
}
PQclear(res);
execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
if (execstatement != NULL) {
strcpy(execstatement, curstr);
strcat(execstatement, sqlstatement);
strcat(execstatement, "\0");
} else {
elog(ERROR, "dblink: insufficient memory" );
}
res = PQexec(conn, execstatement);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
} else {
/*
* got results, start fetching them
*/
PQclear(res);
res = PQexec(conn, "FETCH ALL in mycursor");
if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) {
msg = PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
}
ntuples = PQntuples(res);
if (ntuples > 0) {
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res = res;
res = NULL;
(dblink_results *) fcinfo->flinfo->fn_extra = results;
results = NULL;
results = fcinfo->flinfo->fn_extra;
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
} else {
PQclear(res);
/* close the cursor */
res = PQexec(conn, "CLOSE mycursor");
PQclear(res);
/* commit the transaction */
res = PQexec(conn, "COMMIT");
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprEndResult ;
PG_RETURN_NULL();
}
}
} else {
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples) {
/*
* fetch them if available
*/
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_POINTER(results);
} else {
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
PQclear(results->res);
rsi = (ReturnSetInfo *)fcinfo->resultinfo;
rsi->isDone = ExprEndResult ;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* dblink_tok
* parse dblink output string
* return fldnum item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
elog(ERROR, "dblink: NULL arguments are not permitted");
}
results = (dblink_results *) PG_GETARG_POINTER(0);
if (results == NULL) {
elog(ERROR, "dblink: function called with invalid result pointer");
}
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0) {
elog(ERROR, "dblink: field number < 0 not permitted");
}
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1)) {
elog(ERROR, "dblink: field number %d does not exist", fldnum);
}
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
if (result != NULL) {
strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
strcat(result, "\0");
} else {
elog(ERROR, "dblink: insufficient memory" );
}
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
}
/*
* internal functions
*/
/*
* init_dblink_results
* - create an empty dblink_results data structure
*/
dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_results *) palloc(sizeof(dblink_results));
MemSet(retval, 0, sizeof(dblink_results));
retval->tup_num = -1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}

70
contrib/dblink/dblink.h Normal file
View File

@ -0,0 +1,70 @@
/*
* dblink.h
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#ifndef DBLINK_H
#define DBLINK_H
#include <string.h>
#include "postgres.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "fmgr.h"
#include "access/tupdesc.h"
#include "executor/executor.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
#include "utils/builtins.h"
/*
* This struct holds the results of the remote query.
* Use fn_extra to hold a pointer to it across calls
*/
typedef struct
{
/*
* last tuple number accessed
*/
int tup_num;
/*
* the actual query results
*/
PGresult *res;
} dblink_results;
/*
* External declarations
*/
extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS);
/*
* Internal declarations
*/
dblink_results *init_dblink_results(MemoryContext fn_mcxt);
#endif /* DBLINK_H */

View File

@ -0,0 +1,5 @@
CREATE FUNCTION dblink (text,text) RETURNS setof int
AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
CREATE FUNCTION dblink_tok (int,int) RETURNS text
AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';