From 5af4855383494de44f3f62fcaa91e1ea5d0cc8eb Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Thu, 14 Jun 2001 16:49:03 +0000 Subject: [PATCH] Allow remote query execution (dblink) Joe Conway --- contrib/README | 4 + contrib/dblink/Makefile | 48 ++++++ contrib/dblink/README.dblink | 157 ++++++++++++++++++++ contrib/dblink/dblink.c | 276 +++++++++++++++++++++++++++++++++++ contrib/dblink/dblink.h | 70 +++++++++ contrib/dblink/dblink.sql.in | 5 + 6 files changed, 560 insertions(+) create mode 100644 contrib/dblink/Makefile create mode 100644 contrib/dblink/README.dblink create mode 100644 contrib/dblink/dblink.c create mode 100644 contrib/dblink/dblink.h create mode 100644 contrib/dblink/dblink.sql.in diff --git a/contrib/README b/contrib/README index 4321c89d50..6c46010b57 100644 --- a/contrib/README +++ b/contrib/README @@ -38,6 +38,10 @@ dbase - Converts from dbase/xbase to PostgreSQL by Ivan Baldo, lubaldo@adinet.com.uy +dblink - + Allows remote query execution + by Joe Conway, joe.conway@mail.com + earthdistance - Operator for computing earth distance for two points by Hal Snyder diff --git a/contrib/dblink/Makefile b/contrib/dblink/Makefile new file mode 100644 index 0000000000..6bb4bfe113 --- /dev/null +++ b/contrib/dblink/Makefile @@ -0,0 +1,48 @@ +subdir = contrib/dblink +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_builddir)/src/Makefile.shlib +include_srcdir = $(top_builddir)/src/include + + +NAME := dblink +SONAME := $(NAME)$(DLSUFFIX) + +override CFLAGS += -I$(srcdir) +override CFLAGS += -I$(include_srcdir) +override CFLAGS += -I$(libpq_srcdir) +override CFLAGS += $(CFLAGS_SL) + +OBJS = $(NAME).o + +all: $(OBJS) $(SONAME) $(NAME).sql + +$(OBJS): $(NAME).c + $(CC) -o $@ -c $(CFLAGS) $< + +$(SONAME): $(OBJS) + $(LD) -o $@ -Bshareable $(libpq) $< + +$(NAME).sql: $(NAME).sql.in + sed -e 's:MODULE_PATHNAME:$(libdir)/contrib/$(SONAME):g' < $< > $@ + +install: all installdirs + $(INSTALL_DATA) README.$(NAME) $(docdir)/contrib + $(INSTALL_DATA) $(NAME).sql $(datadir)/contrib + $(INSTALL_SHLIB) $(SONAME) $(libdir)/contrib + +installdirs: + $(mkinstalldirs) $(docdir)/contrib $(datadir)/contrib $(libdir)/contrib + +uninstall: + rm -rf $(docdir)/contrib/README.$(NAME) $(datadir)/contrib/$(NAME).sql $(libdir)/contrib/$(SONAME) + +clean distclean maintainer-clean: + rm -f $(SONAME) *.o *.sql + +depend dep: + $(CC) -MM -MG $(CFLAGS) *.c > depend + +ifeq (depend,$(wildcard depend)) +include depend +endif diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink new file mode 100644 index 0000000000..04219b7be2 --- /dev/null +++ b/contrib/dblink/README.dblink @@ -0,0 +1,157 @@ +/* + * dblink + * + * Functions returning results from a remote database + * + * Copyright (c) Joseph Conway , 2001; + * + * Permission to use, copy, modify, and distribute this software and its + * documentation for any purpose, without fee, and without a written agreement + * is hereby granted, provided that the above copyright notice and this + * paragraph and the following two paragraphs appear in all copies. + * + * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR + * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING + * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS + * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS + * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO + * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + * + */ + +Version 0.2 (29 May, 2001): + Function to test returning data set from remote database + Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel + +Release Notes: + + Version 0.2 + - initial release + +Installation: + Place these files in a directory called 'dblink' under 'contrib' in the PostgreSQL source tree. Then run: + + make + make install + + You can use dblink.sql to create the functions in your database of choice, e.g. + + psql -U postgres template1 < dblink.sql + + installs following functions into database template1: + + dblink() - returns a pointer to results from remote query + dblink_tok() - extracts and returns individual field results + +Documentation +================================================================== +Name + +dblink -- Returns a pointer to a data set from a remote database + +Synopsis + +dblink(text connstr, text sql) + +Inputs + + connstr + + standard libpq format connection srting, + e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" + + sql + + sql statement that you wish to execute on the remote host + e.g. "select * from pg_class" + +Outputs + + Returns setof int (pointer) + +Example usage + + select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd','select f1, f2 from mytable'); + + +================================================================== + +Name + +dblink_tok -- Returns individual select field results from a dblink remote query + +Synopsis + +dblink_tok(int pointer, int fnumber) + +Inputs + + pointer + + a pointer returned by a call to dblink() + + fnumber + + the ordinal position (zero based) of the field to be returned from the dblink result set + +Outputs + + Returns text + +Example usage + + select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2 + from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' + ,'select f1, f2 from mytable') as dblink_p) as t1; + + +================================================================== + +NOTE: If you need to provide selection criteria in a WHERE clause, it is necessary +to 'fake' a UNION, e.g. + + select + dblink_tok(t1.dblink_p,0) as f1 + ,dblink_tok(t1.dblink_p,1) as f2 + from + ( + select dblink( + 'hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' + ,'select f1, f2 from mytable' + ) as dblink_p + union + select null,null where false + ) as t1 + where + f1 = 'mycriteria'; + +in order to work around an issue with the query optimizer. A more convenient way to approach +this problem is to create a view: + + create view myremotetable as + select + dblink_tok(t1.dblink_p,0) as f1 + ,dblink_tok(t1.dblink_p,1) as f2 + from + ( + select dblink( + 'hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' + ,'select f1, f2 from mytable' + ) as dblink_p + union + select null,null where false + ) as t1; + +Then you can simply write: + + select f1,f2 from myremotetable where f1 = 'mycriteria'; + +================================================================== + +-- Joe Conway + diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c new file mode 100644 index 0000000000..b3c9c6cbbf --- /dev/null +++ b/contrib/dblink/dblink.c @@ -0,0 +1,276 @@ +/* + * dblink.c + * + * Functions returning results from a remote database + * + * Copyright (c) Joseph Conway , 2001; + * + * Permission to use, copy, modify, and distribute this software and its + * documentation for any purpose, without fee, and without a written agreement + * is hereby granted, provided that the above copyright notice and this + * paragraph and the following two paragraphs appear in all copies. + * + * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR + * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING + * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS + * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS + * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO + * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + * + */ + +#include "dblink.h" + +PG_FUNCTION_INFO_V1(dblink); +Datum +dblink(PG_FUNCTION_ARGS) +{ + PGconn *conn = NULL; + PGresult *res = NULL; + dblink_results *results; + char *optstr; + char *sqlstatement; + char *curstr = "DECLARE mycursor CURSOR FOR "; + char *execstatement; + char *msg; + int ntuples = 0; + ReturnSetInfo *rsi; + + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) { + elog(ERROR, "dblink: NULL arguments are not permitted"); + } + + if (fcinfo->resultinfo == NULL || ! IsA(fcinfo->resultinfo, ReturnSetInfo)) { + elog(ERROR, "dblink: function called in context that does not accept a set result"); + } + + optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); + sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); + + if (fcinfo->flinfo->fn_extra == NULL) { + + conn = PQconnectdb(optstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + msg = PQerrorMessage(conn); + PQfinish(conn); + elog(ERROR, "dblink: connection error: %s", msg); + } + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = PQerrorMessage(conn); + PQclear(res); + PQfinish(conn); + elog(ERROR, "dblink: begin error: %s", msg); + } + PQclear(res); + + execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1); + if (execstatement != NULL) { + strcpy(execstatement, curstr); + strcat(execstatement, sqlstatement); + strcat(execstatement, "\0"); + } else { + elog(ERROR, "dblink: insufficient memory" ); + } + + res = PQexec(conn, execstatement); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = PQerrorMessage(conn); + PQclear(res); + PQfinish(conn); + elog(ERROR, "dblink: sql error: %s", msg); + } else { + /* + * got results, start fetching them + */ + PQclear(res); + + res = PQexec(conn, "FETCH ALL in mycursor"); + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) { + msg = PQerrorMessage(conn); + PQclear(res); + PQfinish(conn); + elog(ERROR, "dblink: sql error: %s", msg); + } + + ntuples = PQntuples(res); + + if (ntuples > 0) { + + results = init_dblink_results(fcinfo->flinfo->fn_mcxt); + results->tup_num = 0; + results->res = res; + res = NULL; + + (dblink_results *) fcinfo->flinfo->fn_extra = results; + + results = NULL; + results = fcinfo->flinfo->fn_extra; + + /* close the cursor */ + res = PQexec(conn, "CLOSE mycursor"); + PQclear(res); + + /* commit the transaction */ + res = PQexec(conn, "COMMIT"); + PQclear(res); + + /* close the connection to the database and cleanup */ + PQfinish(conn); + + rsi = (ReturnSetInfo *)fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + PG_RETURN_POINTER(results); + + } else { + + PQclear(res); + + /* close the cursor */ + res = PQexec(conn, "CLOSE mycursor"); + PQclear(res); + + /* commit the transaction */ + res = PQexec(conn, "COMMIT"); + PQclear(res); + + /* close the connection to the database and cleanup */ + PQfinish(conn); + + rsi = (ReturnSetInfo *)fcinfo->resultinfo; + rsi->isDone = ExprEndResult ; + + PG_RETURN_NULL(); + } + } + } else { + /* + * check for more results + */ + + results = fcinfo->flinfo->fn_extra; + results->tup_num++; + ntuples = PQntuples(results->res); + + if (results->tup_num < ntuples) { + + /* + * fetch them if available + */ + + rsi = (ReturnSetInfo *)fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + PG_RETURN_POINTER(results); + + } else { + + /* + * or if no more, clean things up + */ + + results = fcinfo->flinfo->fn_extra; + + PQclear(results->res); + + rsi = (ReturnSetInfo *)fcinfo->resultinfo; + rsi->isDone = ExprEndResult ; + + PG_RETURN_NULL(); + } + } + PG_RETURN_NULL(); +} + + +/* + * dblink_tok + * parse dblink output string + * return fldnum item (0 based) + * based on provided field separator + */ + +PG_FUNCTION_INFO_V1(dblink_tok); +Datum +dblink_tok(PG_FUNCTION_ARGS) +{ + dblink_results *results; + int fldnum; + text *result_text; + char *result; + int nfields = 0; + int text_len = 0; + + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) { + elog(ERROR, "dblink: NULL arguments are not permitted"); + } + + results = (dblink_results *) PG_GETARG_POINTER(0); + if (results == NULL) { + elog(ERROR, "dblink: function called with invalid result pointer"); + } + + fldnum = PG_GETARG_INT32(1); + if (fldnum < 0) { + elog(ERROR, "dblink: field number < 0 not permitted"); + } + + nfields = PQnfields(results->res); + if (fldnum > (nfields - 1)) { + elog(ERROR, "dblink: field number %d does not exist", fldnum); + } + + text_len = PQgetlength(results->res, results->tup_num, fldnum); + + result = (char *) palloc(text_len + 1); + + if (result != NULL) { + strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum)); + strcat(result, "\0"); + } else { + elog(ERROR, "dblink: insufficient memory" ); + } + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); + + PG_RETURN_TEXT_P(result_text); +} + + +/* + * internal functions + */ + + +/* + * init_dblink_results + * - create an empty dblink_results data structure + */ +dblink_results * +init_dblink_results(MemoryContext fn_mcxt) +{ + MemoryContext oldcontext; + dblink_results *retval; + + oldcontext = MemoryContextSwitchTo(fn_mcxt); + + retval = (dblink_results *) palloc(sizeof(dblink_results)); + MemSet(retval, 0, sizeof(dblink_results)); + + retval->tup_num = -1; + retval->res = NULL; + + MemoryContextSwitchTo(oldcontext); + + return retval; +} diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h new file mode 100644 index 0000000000..1b2a48e9fb --- /dev/null +++ b/contrib/dblink/dblink.h @@ -0,0 +1,70 @@ +/* + * dblink.h + * + * Functions returning results from a remote database + * + * Copyright (c) Joseph Conway , 2001; + * + * Permission to use, copy, modify, and distribute this software and its + * documentation for any purpose, without fee, and without a written agreement + * is hereby granted, provided that the above copyright notice and this + * paragraph and the following two paragraphs appear in all copies. + * + * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR + * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING + * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS + * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS + * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO + * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + * + */ + +#ifndef DBLINK_H +#define DBLINK_H + +#include +#include "postgres.h" +#include "libpq-fe.h" +#include "libpq-int.h" +#include "fmgr.h" +#include "access/tupdesc.h" +#include "executor/executor.h" +#include "nodes/nodes.h" +#include "nodes/execnodes.h" +#include "utils/builtins.h" + +/* + * This struct holds the results of the remote query. + * Use fn_extra to hold a pointer to it across calls + */ +typedef struct +{ + /* + * last tuple number accessed + */ + int tup_num; + + /* + * the actual query results + */ + PGresult *res; + +} dblink_results; + +/* + * External declarations + */ +extern Datum dblink(PG_FUNCTION_ARGS); +extern Datum dblink_tok(PG_FUNCTION_ARGS); + +/* + * Internal declarations + */ +dblink_results *init_dblink_results(MemoryContext fn_mcxt); + +#endif /* DBLINK_H */ diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in new file mode 100644 index 0000000000..1615c00413 --- /dev/null +++ b/contrib/dblink/dblink.sql.in @@ -0,0 +1,5 @@ +CREATE FUNCTION dblink (text,text) RETURNS setof int + AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'; + +CREATE FUNCTION dblink_tok (int,int) RETURNS text + AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';