diff --git a/contrib/README b/contrib/README index 2b0085483e..0d3b04fd1f 100644 --- a/contrib/README +++ b/contrib/README @@ -48,7 +48,7 @@ dbase - dblink - Allows remote query execution - by Joe Conway + by Joe Conway dbmirror - Replication server @@ -73,7 +73,7 @@ fulltextindex - fuzzystrmatch - Levenshtein, metaphone, and soundex fuzzy string matching - by Joe Conway , Joel Burton + by Joe Conway , Joel Burton intagg - Integer aggregator diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink index 8e6adf069f..f304b7729d 100644 --- a/contrib/dblink/README.dblink +++ b/contrib/dblink/README.dblink @@ -3,7 +3,9 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001, 2002, + * Joe Conway + * + * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -25,13 +27,36 @@ * */ - -Version 0.4 (7 April, 2002): - Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and - various utility functions. - Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel +Version 0.5 (25 August, 2002): + Major overhaul to work with new backend "table function" capability. Removed + dblink_strtok() and dblink_replace() functions because they are now + available as backend functions (split() and replace() respectively). + Tested under Linux (Red Hat 7.3) and PostgreSQL 7.3devel. This version + is no longer backwards portable to PostgreSQL 7.2. Release Notes: + Version 0.5 + - dblink now supports use directly as a table function; this is the new + preferred usage going forward + - Use of dblink_tok is now deprecated; original form of dblink is also + deprecated. They _will_ be removed in the next version. + - dblink_last_oid is also deprecated; use dblink_exec() which returns + the command status as a single row, single column result. + - Original dblink, dblink_tok, and dblink_last_oid are commented out in + dblink.sql; remove the comments to use the deprecated functions. + - dblink_strtok() and dblink_replace() functions were removed. Use + split() and replace() respectively (new backend functions in + PostgreSQL 7.3) instead. + - New functions: dblink_exec() for non-SELECT queries; dblink_connect() + opens connection that persists for duration of a backend; + dblink_disconnect() closes a persistent connection; dblink_open() + opens a cursor; dblink_fetch() fetches results from an open cursor. + dblink_close() closes a cursor. + - New test suite: dblink_check.sh, dblink.test.sql, + dblink.test.expected.out. Execute dblink_check.sh from the same + directory as the other two files. Output is dblink.test.out and + dblink.test.diff. Note that dblink.test.sql is a good source + of example usage. Version 0.4 - removed cursor wrap around input sql to allow for remote @@ -59,16 +84,48 @@ Installation: installs following functions into database template1: - dblink(text,text) RETURNS setof int - - returns a resource id for results from remote query - dblink_tok(int,int) RETURNS text - - extracts and returns individual field results - dblink_strtok(text,text,int) RETURNS text - - extracts and returns individual token from delimited text + connection + ------------ + dblink_connect(text) RETURNS text + - opens a connection that will persist for duration of current + backend or until it is disconnected + dblink_disconnect() RETURNS text + - disconnects a persistent connection + + cursor + ------------ + dblink_open(text,text) RETURNS text + - opens a cursor using connection already opened with dblink_connect() + that will persist for duration of current backend or until it is + closed + dblink_fetch(text, int) RETURNS setof record + - fetches data from an already opened cursor + dblink_close(text) RETURNS text + - closes a cursor + + query + ------------ + dblink(text,text) RETURNS setof record + - returns a set of results from remote SELECT query + (Note: comment out in dblink.sql to use deprecated version) + dblink(text) RETURNS setof record + - returns a set of results from remote SELECT query, using connection + already opened with dblink_connect() + + execute + ------------ + dblink_exec(text, text) RETURNS text + - executes an INSERT/UPDATE/DELETE query remotely + dblink_exec(text) RETURNS text + - executes an INSERT/UPDATE/DELETE query remotely, using connection + already opened with dblink_connect() + + misc + ------------ + dblink_current_query() RETURNS text + - returns the current query string dblink_get_pkey(text) RETURNS setof text - returns the field names of a relation's primary key fields - dblink_last_oid(int) RETURNS oid - - returns the last inserted oid dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text - builds an insert statement using a local tuple, replacing the selection key field values with alternate supplied values @@ -78,338 +135,30 @@ Installation: dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text - builds an update statement using a local tuple, replacing the selection key field values with alternate supplied values - dblink_current_query() RETURNS text - - returns the current query string - dblink_replace(text,text,text) RETURNS text - - replace all occurences of substring-a in the input-string - with substring-b -Documentation -================================================================== -Name + Not installed by default + deprecated + ------------ + dblink(text,text) RETURNS setof int + - *DEPRECATED* returns a resource id for results from remote query + (Note: must uncomment in dblink.sql to use) + dblink_tok(int,int) RETURNS text + - *DEPRECATED* extracts and returns individual field results; used + only in conjunction with the *DEPRECATED* form of dblink + (Note: must uncomment in dblink.sql to use) + dblink_last_oid(int) RETURNS oid + - *DEPRECATED* returns the last inserted oid -dblink -- Returns a resource id for a data set from a remote database - -Synopsis - -dblink(text connstr, text sql) - -Inputs - - connstr - - standard libpq format connection srting, - e.g. "hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd" - - sql - - sql statement that you wish to execute on the remote host - e.g. "select * from pg_class" - -Outputs - - Returns setof int (res_id) - -Example usage - - select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' - ,'select f1, f2 from mytable'); +Documentation: + See the following files: + doc/connection + doc/cursor + doc/query + doc/execute + doc/misc + doc/deprecated ================================================================== - -Name - -dblink_tok -- Returns individual select field results from a dblink remote query - -Synopsis - -dblink_tok(int res_id, int fnumber) - -Inputs - - res_id - - a resource id returned by a call to dblink() - - fnumber - - the ordinal position (zero based) of the field to be returned from the dblink result set - -Outputs - - Returns text - -Example usage - - select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2 - from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' - ,'select f1, f2 from mytable') as dblink_p) as t1; - - -================================================================== - -A more convenient way to use dblink may be to create a view: - - create view myremotetable as - select dblink_tok(t1.dblink_p,0) as f1, dblink_tok(t1.dblink_p,1) as f2 - from (select dblink('hostaddr=127.0.0.1 port=5432 dbname=template1 user=postgres password=postgres' - ,'select proname, prosrc from pg_proc') as dblink_p) as t1; - -Then you can simply write: - - select f1, f2 from myremotetable where f1 like 'bytea%'; - -================================================================== -Name - -dblink_strtok -- Extracts and returns individual token from delimited text - -Synopsis - -dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text - -Inputs - - inputstring - - any string you want to parse a token out of; - e.g. 'f=1&g=3&h=4' - - delimiter - - a single character to use as the delimiter; - e.g. '&' or '=' - - posn - - the position of the token of interest, 0 based; - e.g. 1 - -Outputs - - Returns text - -Example usage - -test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1); - dblink_strtok ---------------- - 3 -(1 row) - -================================================================== -Name - -dblink_get_pkey -- returns the field names of a relation's primary - key fields - -Synopsis - -dblink_get_pkey(text relname) RETURNS setof text - -Inputs - - relname - - any relation name; - e.g. 'foobar' - -Outputs - - Returns setof text -- one row for each primary key field, in order of - precedence - -Example usage - -test=# select dblink_get_pkey('foobar'); - dblink_get_pkey ------------------ - f1 - f2 - f3 - f4 - f5 -(5 rows) - - -================================================================== -Name - -dblink_last_oid -- Returns last inserted oid - -Synopsis - -dblink_last_oid(int res_id) RETURNS oid - -Inputs - - res_id - - any resource id returned by dblink function; - -Outputs - - Returns oid of last inserted tuple - -Example usage - -test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' - ,'insert into mytable (f1, f2) values (1,2)')); - - dblink_last_oid ----------------- - 16553 -(1 row) - - -================================================================== -Name - -dblink_build_sql_insert -- builds an insert statement using a local - tuple, replacing the selection key field - values with alternate supplied values -dblink_build_sql_delete -- builds a delete statement using supplied - values for selection key field values -dblink_build_sql_update -- builds an update statement using a local - tuple, replacing the selection key field - values with alternate supplied values - - -Synopsis - -dblink_build_sql_insert(text relname - ,int2vector primary_key_attnums - ,int2 num_primary_key_atts - ,_text src_pk_att_vals_array - ,_text tgt_pk_att_vals_array) RETURNS text -dblink_build_sql_delete(text relname - ,int2vector primary_key_attnums - ,int2 num_primary_key_atts - ,_text tgt_pk_att_vals_array) RETURNS text -dblink_build_sql_update(text relname - ,int2vector primary_key_attnums - ,int2 num_primary_key_atts - ,_text src_pk_att_vals_array - ,_text tgt_pk_att_vals_array) RETURNS text - -Inputs - - relname - - any relation name; - e.g. 'foobar' - - primary_key_attnums - - vector of primary key attnums (1 based, see pg_index.indkey); - e.g. '1 2' - - num_primary_key_atts - - number of primary key attnums in the vector; e.g. 2 - - src_pk_att_vals_array - - array of primary key values, used to look up the local matching - tuple, the values of which are then used to construct the SQL - statement - - tgt_pk_att_vals_array - - array of primary key values, used to replace the local tuple - values in the SQL statement - -Outputs - - Returns text -- requested SQL statement - -Example usage - -test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}'); - dblink_build_sql_insert --------------------------------------------------- - INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1') -(1 row) - -test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}'); - dblink_build_sql_delete ---------------------------------------------- - DELETE FROM "MyFoo" WHERE f1='1' AND f2='b' -(1 row) - -test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}'); - dblink_build_sql_update -------------------------------------------------------------- - UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b' -(1 row) - - -================================================================== -Name - -dblink_current_query -- returns the current query string - -Synopsis - -dblink_current_query () RETURNS text - -Inputs - - None - -Outputs - - Returns text -- a copy of the currently executing query - -Example usage - -test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1; - dblink_current_query ------------------------------------------------------------------------------------------------------------------------------------------------------ - select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1; -(1 row) - - -================================================================== -Name - -dblink_replace -- replace all occurences of substring-a in the - input-string with substring-b - -Synopsis - -dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text - -Inputs - - input-string - - the starting string, before replacement of substring-a - - substring-a - - the substring to find and replace - - substring-b - - the substring to be substituted in place of substring-a - -Outputs - - Returns text -- a copy of the starting string, but with all occurences of - substring-a replaced with substring-b - -Example usage - -test=# select dblink_replace('12345678901234567890','56','hello'); - dblink_replace ----------------------------- - 1234hello78901234hello7890 -(1 row) - -================================================================== - - -- Joe Conway diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 0401e06f4f..a6ede5ae1c 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -3,7 +3,9 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001, 2002, + * Joe Conway + * + * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -25,16 +27,39 @@ * */ -#include "dblink.h" +#include +#include "postgres.h" +#include "libpq-fe.h" +#include "libpq-int.h" +#include "fmgr.h" +#include "funcapi.h" +#include "access/tupdesc.h" +#include "access/heapam.h" +#include "catalog/catname.h" +#include "catalog/namespace.h" +#include "catalog/pg_index.h" +#include "catalog/pg_type.h" +#include "executor/executor.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "nodes/nodes.h" +#include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "parser/parse_type.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/array.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "dblink.h" /* * Internal declarations */ static dblink_results *init_dblink_results(MemoryContext fn_mcxt); -static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt); static char **get_pkey_attnames(Oid relid, int16 *numatts); -static char *get_strtok(char *fldtext, char *fldsep, int fldnum); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); @@ -43,14 +68,593 @@ static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); -static dblink_results *get_res_ptr(int32 res_id_index); +static dblink_results *get_res_ptr(int32 res_id_index); static void append_res_ptr(dblink_results *results); static void remove_res_ptr(dblink_results *results); +static TupleDesc pgresultGetTupleDesc(PGresult *res); /* Global */ -List *res_id = NIL; -int res_id_index = 0; +List *res_id = NIL; +int res_id_index = 0; +PGconn *persistent_conn = NULL; +#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) +#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) +#define xpfree(var_) \ + do { \ + if (var_ != NULL) \ + { \ + pfree(var_); \ + var_ = NULL; \ + } \ + } while (0) + + +/* + * Create a persistent connection to another database + */ +PG_FUNCTION_INFO_V1(dblink_connect); +Datum +dblink_connect(PG_FUNCTION_ARGS) +{ + char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); + char *msg; + text *result_text; + MemoryContext oldcontext; + + if (persistent_conn != NULL) + PQfinish(persistent_conn); + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + persistent_conn = PQconnectdb(connstr); + MemoryContextSwitchTo(oldcontext); + + if (PQstatus(persistent_conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(persistent_conn)); + PQfinish(persistent_conn); + persistent_conn = NULL; + elog(ERROR, "dblink_connect: connection error: %s", msg); + } + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Clear a persistent connection to another database + */ +PG_FUNCTION_INFO_V1(dblink_disconnect); +Datum +dblink_disconnect(PG_FUNCTION_ARGS) +{ + text *result_text; + + if (persistent_conn != NULL) + PQfinish(persistent_conn); + + persistent_conn = NULL; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * opens a cursor using a persistent connection + */ +PG_FUNCTION_INFO_V1(dblink_open); +Datum +dblink_open(PG_FUNCTION_ARGS) +{ + char *msg; + PGresult *res = NULL; + PGconn *conn = NULL; + text *result_text; + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + char *sql = GET_STR(PG_GETARG_TEXT_P(1)); + StringInfo str = makeStringInfo(); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_open: no connection available"); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_open: begin error: %s", msg); + } + PQclear(res); + + appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql); + res = PQexec(conn, str->data); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + + PQclear(res); + + PQfinish(conn); + persistent_conn = NULL; + + elog(ERROR, "dblink: sql error: %s", msg); + } + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * closes a cursor + */ +PG_FUNCTION_INFO_V1(dblink_close); +Datum +dblink_close(PG_FUNCTION_ARGS) +{ + PGconn *conn = NULL; + PGresult *res = NULL; + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + StringInfo str = makeStringInfo(); + text *result_text; + char *msg; + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_close: no connection available"); + + appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname)); + + /* close the cursor */ + res = PQexec(conn, str->data); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_close: sql error: %s", msg); + } + + PQclear(res); + + /* commit the transaction */ + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_close: commit error: %s", msg); + } + PQclear(res); + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Fetch results from an open cursor + */ +PG_FUNCTION_INFO_V1(dblink_fetch); +Datum +dblink_fetch(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + TupleDesc tupdesc = NULL; + int call_cntr; + int max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + char *msg; + PGresult *res = NULL; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + Oid functypeid; + char functyptype; + Oid funcid = fcinfo->flinfo->fn_oid; + PGconn *conn = NULL; + StringInfo str = makeStringInfo(); + char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + int howmany = PG_GETARG_INT32(1); + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_fetch: no connection available"); + + appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname)); + + res = PQexec(conn, str->data); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + + PQfinish(persistent_conn); + persistent_conn = NULL; + + elog(ERROR, "dblink_fetch: sql error: %s", msg); + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* cursor does not exist - closed already or bad name */ + PQclear(res); + elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname)); + } + + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* fast track when no results */ + if (funcctx->max_calls < 1) + SRF_RETURN_DONE(funcctx); + + /* check typtype to see if we have a predetermined return type */ + functypeid = get_func_rettype(funcid); + functyptype = get_typtype(functypeid); + + if (functyptype == 'c') + tupdesc = TypeGetTupleDesc(functypeid, NIL); + else if (functyptype == 'p' && functypeid == RECORDOID) + tupdesc = pgresultGetTupleDesc(res); + else if (functyptype == 'b') + elog(ERROR, "dblink_fetch: invalid kind of return type specified for function"); + else + elog(ERROR, "dblink_fetch: unknown kind of return type specified for function"); + + /* store needed metadata for subsequent calls */ + slot = TupleDescGetSlot(tupdesc); + funcctx->slot = slot; + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + slot = funcctx->slot; + + res = (PGresult *) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + tupdesc = attinmeta->tupdesc; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + int i; + int nfields = PQnfields(res); + + values = (char **) palloc(nfields * sizeof(char *)); + for (i = 0; i < nfields; i++) + { + if (PQgetisnull(res, call_cntr, i) == 0) + values[i] = PQgetvalue(res, call_cntr, i); + else + values[i] = NULL; + } + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else /* do when there is no more left */ + { + PQclear(res); + SRF_RETURN_DONE(funcctx); + } +} + +/* + * Note: this is the new preferred version of dblink + */ +PG_FUNCTION_INFO_V1(dblink_record); +Datum +dblink_record(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + TupleDesc tupdesc = NULL; + int call_cntr; + int max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + char *msg; + PGresult *res = NULL; + bool is_sql_cmd = false; + char *sql_cmd_status = NULL; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + Oid functypeid; + char functyptype; + Oid funcid = fcinfo->flinfo->fn_oid; + PGconn *conn = NULL; + char *connstr = NULL; + char *sql = NULL; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + if (fcinfo->nargs == 2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); + + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + elog(ERROR, "dblink: connection error: %s", msg); + } + } + else if (fcinfo->nargs == 1) + { + sql = GET_STR(PG_GETARG_TEXT_P(0)); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink: no connection available"); + } + else + elog(ERROR, "dblink: wrong number of arguments"); + + res = PQexec(conn, sql); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + if (fcinfo->nargs == 1) + persistent_conn = NULL; + + elog(ERROR, "dblink: sql error: %s", msg); + } + else + { + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); + + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; + } + else + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* if needed, close the connection to the database and cleanup */ + if (fcinfo->nargs == 2) + PQfinish(conn); + } + + /* fast track when no results */ + if (funcctx->max_calls < 1) + SRF_RETURN_DONE(funcctx); + + /* check typtype to see if we have a predetermined return type */ + functypeid = get_func_rettype(funcid); + functyptype = get_typtype(functypeid); + + if (!is_sql_cmd) + { + if (functyptype == 'c') + tupdesc = TypeGetTupleDesc(functypeid, NIL); + else if (functyptype == 'p' && functypeid == RECORDOID) + tupdesc = pgresultGetTupleDesc(res); + else if (functyptype == 'b') + elog(ERROR, "Invalid kind of return type specified for function"); + else + elog(ERROR, "Unknown kind of return type specified for function"); + } + + /* store needed metadata for subsequent calls */ + slot = TupleDescGetSlot(tupdesc); + funcctx->slot = slot; + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + slot = funcctx->slot; + + res = (PGresult *) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + tupdesc = attinmeta->tupdesc; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + + if (!is_sql_cmd) + { + int i; + int nfields = PQnfields(res); + + values = (char **) palloc(nfields * sizeof(char *)); + for (i = 0; i < nfields; i++) + { + if (PQgetisnull(res, call_cntr, i) == 0) + values[i] = PQgetvalue(res, call_cntr, i); + else + values[i] = NULL; + } + } + else + { + values = (char **) palloc(1 * sizeof(char *)); + values[0] = sql_cmd_status; + } + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else /* do when there is no more left */ + { + PQclear(res); + SRF_RETURN_DONE(funcctx); + } +} + +/* + * Execute an SQL non-SELECT command + */ +PG_FUNCTION_INFO_V1(dblink_exec); +Datum +dblink_exec(PG_FUNCTION_ARGS) +{ + char *msg; + PGresult *res = NULL; + char *sql_cmd_status = NULL; + TupleDesc tupdesc = NULL; + text *result_text; + PGconn *conn = NULL; + char *connstr = NULL; + char *sql = NULL; + + if (fcinfo->nargs == 2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); + + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + elog(ERROR, "dblink_exec: connection error: %s", msg); + } + } + else if (fcinfo->nargs == 1) + { + sql = GET_STR(PG_GETARG_TEXT_P(0)); + + if (persistent_conn != NULL) + conn = persistent_conn; + else + elog(ERROR, "dblink_exec: no connection available"); + } + else + elog(ERROR, "dblink_exec: wrong number of arguments"); + + + res = PQexec(conn, sql); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + { + msg = pstrdup(PQerrorMessage(conn)); + PQclear(res); + PQfinish(conn); + if (fcinfo->nargs == 1) + persistent_conn = NULL; + + elog(ERROR, "dblink_exec: sql error: %s", msg); + } + else + { + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); + + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + } + else + elog(ERROR, "dblink_exec: queries returning results not allowed"); + } + PQclear(res); + + /* if needed, close the connection to the database and cleanup */ + if (fcinfo->nargs == 2) + PQfinish(conn); + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); + PG_RETURN_TEXT_P(result_text); +} + +/* + * Note: this original version of dblink is DEPRECATED; + * it *will* be removed in favor of the new version on next release + */ PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) @@ -179,14 +783,15 @@ dblink(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } - /* + * Note: dblink_tok is DEPRECATED; + * it *will* be removed in favor of the new version on next release + * * dblink_tok * parse dblink output string * return fldnum item (0 based) * based on provided field separator */ - PG_FUNCTION_INFO_V1(dblink_tok); Datum dblink_tok(PG_FUNCTION_ARGS) @@ -241,162 +846,121 @@ dblink_tok(PG_FUNCTION_ARGS) } } - -/* - * dblink_strtok - * parse input string - * return ord item (0 based) - * based on provided field separator - */ -PG_FUNCTION_INFO_V1(dblink_strtok); -Datum -dblink_strtok(PG_FUNCTION_ARGS) -{ - char *fldtext; - char *fldsep; - int fldnum; - char *buffer; - text *result_text; - - fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); - fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); - fldnum = PG_GETARG_INT32(2); - - if (fldtext[0] == '\0') - { - elog(ERROR, "get_strtok: blank list not permitted"); - } - if (fldsep[0] == '\0') - { - elog(ERROR, "get_strtok: blank field separator not permitted"); - } - - buffer = get_strtok(fldtext, fldsep, fldnum); - - pfree(fldtext); - pfree(fldsep); - - if (buffer == NULL) - { - PG_RETURN_NULL(); - } - else - { - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer))); - pfree(buffer); - - PG_RETURN_TEXT_P(result_text); - } -} - - /* * dblink_get_pkey * - * Return comma delimited list of primary key - * fields for the supplied relation, + * Return list of primary key fields for the supplied relation, * or NULL if none exists. */ PG_FUNCTION_INFO_V1(dblink_get_pkey); Datum dblink_get_pkey(PG_FUNCTION_ARGS) { - text *relname_text; - Oid relid; - char **result; - text *result_text; - int16 numatts; - ReturnSetInfo *rsi; - dblink_array_results *ret_set; + int16 numatts; + Oid relid; + char **results; + FuncCallContext *funcctx; + int32 call_cntr; + int32 max_calls; + TupleTableSlot *slot; + AttInMetadata *attinmeta; + MemoryContext oldcontext; - if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) - elog(ERROR, "dblink: function called in context that does not accept a set result"); + /* stuff done only on the first call of the function */ + if(SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc = NULL; - if (fcinfo->flinfo->fn_extra == NULL) - { - relname_text = PG_GETARG_TEXT_P(0); + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); - /* - * Convert relname to rel OID. - */ - relid = get_relid_from_relname(relname_text); + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* convert relname to rel Oid */ + relid = get_relid_from_relname(PG_GETARG_TEXT_P(0)); if (!OidIsValid(relid)) elog(ERROR, "dblink_get_pkey: relation does not exist"); + /* need a tuple descriptor representing one INT and one TEXT column */ + tupdesc = CreateTemplateTupleDesc(2, WITHOUTOID); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position", + INT4OID, -1, 0, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname", + TEXTOID, -1, 0, false); + + /* allocate a slot for a tuple with this tupdesc */ + slot = TupleDescGetSlot(tupdesc); + + /* assign slot to function context */ + funcctx->slot = slot; + /* - * get an array of attnums. + * Generate attribute metadata needed later to produce tuples from raw + * C strings */ - result = get_pkey_attnames(relid, &numatts); + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; - if ((result != NULL) && (numatts > 0)) + /* get an array of attnums */ + results = get_pkey_attnames(relid, &numatts); + + if ((results != NULL) && (numatts > 0)) { - ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); + funcctx->max_calls = numatts; - ret_set->elem_num = 0; - ret_set->num_elems = numatts; - ret_set->res = result; - - fcinfo->flinfo->fn_extra = (void *) ret_set; - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); - - PG_RETURN_TEXT_P(result_text); + /* got results, keep track of them */ + funcctx->user_fctx = results; } - else - { - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; + else /* fast track when no results */ + SRF_RETURN_DONE(funcctx); - PG_RETURN_NULL(); - } + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + slot = funcctx->slot; + + results = (char **) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + + values = (char **) palloc(2 * sizeof(char *)); + values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */ + + sprintf(values[0], "%d", call_cntr + 1); + + values[1] = results[call_cntr]; + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = TupleGetDatum(slot, tuple); + + SRF_RETURN_NEXT(funcctx, result); } - else - { - /* - * check for more results - */ - ret_set = fcinfo->flinfo->fn_extra; - ret_set->elem_num++; - result = ret_set->res; - - if (ret_set->elem_num < ret_set->num_elems) - { - /* - * fetch next one - */ - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); - PG_RETURN_TEXT_P(result_text); - } - else - { - int i; - - /* - * or if no more, clean things up - */ - for (i = 0; i < ret_set->num_elems; i++) - pfree(result[i]); - - pfree(ret_set->res); - pfree(ret_set); - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; - - PG_RETURN_NULL(); - } - } - PG_RETURN_NULL(); + else /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); } - /* + * Note: dblink_last_oid is DEPRECATED; + * it *will* be removed on next release + * * dblink_last_oid * return last inserted oid */ @@ -447,23 +1011,26 @@ Datum dblink_build_sql_insert(PG_FUNCTION_ARGS) { Oid relid; - text *relname_text; - int16 *pkattnums; + text *relname_text; + int16 *pkattnums; int16 pknumatts; - char **src_pkattvals; - char **tgt_pkattvals; - ArrayType *src_pkattvals_arry; - ArrayType *tgt_pkattvals_arry; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; int src_ndim; - int *src_dim; + int *src_dim; int src_nitems; int tgt_ndim; int *tgt_dim; int tgt_nitems; int i; - char *ptr; - char *sql; - text *sql_text; + char *ptr; + char *sql; + text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -503,12 +1070,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), + &typlen, &typbyval, &typalign); + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -529,12 +1100,16 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -586,6 +1161,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) char *ptr; char *sql; text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -624,12 +1202,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -690,6 +1272,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) char *ptr; char *sql; text *sql_text; + int16 typlen; + bool typbyval; + char typalign; relname_text = PG_GETARG_TEXT_P(0); @@ -729,12 +1314,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), + &typlen, &typbyval, &typalign); + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -755,12 +1344,16 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); + get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), + &typlen, &typbyval, &typalign); + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); - ptr += INTALIGN(*(int32 *) ptr); + ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); + ptr = (char *) att_align(ptr, typalign); } /* @@ -779,7 +1372,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(sql_text); } - /* * dblink_current_query * return the current query string @@ -797,64 +1389,6 @@ dblink_current_query(PG_FUNCTION_ARGS) } -/* - * dblink_replace_text - * replace all occurences of 'old_sub_str' in 'orig_str' - * with 'new_sub_str' to form 'new_str' - * - * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == '' - * otherwise returns 'new_str' - */ -PG_FUNCTION_INFO_V1(dblink_replace_text); -Datum -dblink_replace_text(PG_FUNCTION_ARGS) -{ - text *left_text; - text *right_text; - text *buf_text; - text *ret_text; - char *ret_str; - int curr_posn; - text *src_text = PG_GETARG_TEXT_P(0); - int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text))); - text *from_sub_text = PG_GETARG_TEXT_P(1); - int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text))); - text *to_sub_text = PG_GETARG_TEXT_P(2); - char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text))); - StringInfo str = makeStringInfo(); - - if (src_text_len == 0 || from_sub_text_len == 0) - PG_RETURN_TEXT_P(src_text); - - buf_text = DatumGetTextPCopy(PointerGetDatum(src_text)); - curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); - - while (curr_posn > 0) - { - left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); - right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1)); - - appendStringInfo(str, "%s", - DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text)))); - appendStringInfo(str, "%s", to_sub_str); - - pfree(buf_text); - pfree(left_text); - buf_text = right_text; - curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); - } - - appendStringInfo(str, "%s", - DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text)))); - pfree(buf_text); - - ret_str = pstrdup(str->data); - ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str))); - - PG_RETURN_TEXT_P(ret_text); -} - - /************************************************************* * internal functions */ @@ -884,31 +1418,6 @@ init_dblink_results(MemoryContext fn_mcxt) return retval; } - -/* - * init_dblink_array_results - * - create an empty dblink_array_results data structure - */ -static dblink_array_results * -init_dblink_array_results(MemoryContext fn_mcxt) -{ - MemoryContext oldcontext; - dblink_array_results *retval; - - oldcontext = MemoryContextSwitchTo(fn_mcxt); - - retval = (dblink_array_results *) palloc(sizeof(dblink_array_results)); - MemSet(retval, 0, sizeof(dblink_array_results)); - - retval->elem_num = -1; - retval->num_elems = 0; - retval->res = NULL; - - MemoryContextSwitchTo(oldcontext); - - return retval; -} - /* * get_pkey_attnames * @@ -927,21 +1436,14 @@ get_pkey_attnames(Oid relid, int16 *numatts) Relation rel; TupleDesc tupdesc; - /* - * Open relation using relid, get tupdesc - */ + /* open relation using relid, get tupdesc */ rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; - /* - * Initialize numatts to 0 in case no primary key - * exists - */ + /* initialize numatts to 0 in case no primary key exists */ *numatts = 0; - /* - * Use relid to get all related indexes - */ + /* use relid to get all related indexes */ indexRelation = heap_openr(IndexRelationName, AccessShareLock); ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, F_OIDEQ, ObjectIdGetDatum(relid)); @@ -951,9 +1453,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) { Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); - /* - * We're only interested if it is the primary key - */ + /* we're only interested if it is the primary key */ if (index->indisprimary == TRUE) { i = 0; @@ -963,6 +1463,7 @@ get_pkey_attnames(Oid relid, int16 *numatts) if (*numatts > 0) { result = (char **) palloc(*numatts * sizeof(char *)); + for (i = 0; i < *numatts; i++) result[i] = SPI_fname(tupdesc, index->indkey[i]); } @@ -976,41 +1477,6 @@ get_pkey_attnames(Oid relid, int16 *numatts) return result; } - -/* - * get_strtok - * - * parse input string - * return ord item (0 based) - * based on provided field separator - */ -static char * -get_strtok(char *fldtext, char *fldsep, int fldnum) -{ - int j = 0; - char *result; - - if (fldnum < 0) - { - elog(ERROR, "get_strtok: field number < 0 not permitted"); - } - - if (fldsep[0] == '\0') - { - elog(ERROR, "get_strtok: blank field separator not permitted"); - } - - result = strtok(fldtext, fldsep); - for (j = 1; j < fldnum + 1; j++) - { - result = strtok(NULL, fldsep); - if (result == NULL) - return NULL; - } - - return pstrdup(result); -} - static char * get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { @@ -1035,6 +1501,8 @@ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + if (!tuple) + elog(ERROR, "dblink_build_sql_insert: row not found"); appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); @@ -1175,6 +1643,8 @@ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattval natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + if (!tuple) + elog(ERROR, "dblink_build_sql_update: row not found"); appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); @@ -1314,7 +1784,8 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); - tupdesc = rel->rd_att; + tupdesc = CreateTupleDescCopy(rel->rd_att); + relation_close(rel, AccessShareLock); /* * Connect to SPI manager @@ -1388,7 +1859,6 @@ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_p static Oid get_relid_from_relname(text *relname_text) { -#ifdef NamespaceRelationName RangeVar *relvar; Relation rel; Oid relid; @@ -1397,16 +1867,6 @@ get_relid_from_relname(text *relname_text) rel = heap_openrv(relvar, AccessShareLock); relid = RelationGetRelid(rel); relation_close(rel, AccessShareLock); -#else - char *relname; - Relation rel; - Oid relid; - - relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text))); - rel = relation_openr(relname, AccessShareLock); - relid = RelationGetRelid(rel); - relation_close(rel, AccessShareLock); -#endif /* NamespaceRelationName */ return relid; } @@ -1456,3 +1916,55 @@ remove_res_ptr(dblink_results *results) res_id_index = 0; } +static TupleDesc +pgresultGetTupleDesc(PGresult *res) +{ + int natts; + AttrNumber attnum; + TupleDesc desc; + char *attname; + int32 atttypmod; + int attdim; + bool attisset; + Oid atttypid; + int i; + + /* + * allocate a new tuple descriptor + */ + natts = PQnfields(res); + if (natts < 1) + elog(ERROR, "cannot create a description for empty results"); + + desc = CreateTemplateTupleDesc(natts, WITHOUTOID); + + attnum = 0; + + for (i = 0; i < natts; i++) + { + /* + * for each field, get the name and type information from the query + * result and have TupleDescInitEntry fill in the attribute + * information we need. + */ + attnum++; + + attname = PQfname(res, i); + atttypid = PQftype(res, i); + atttypmod = PQfmod(res, i); + + if (PQfsize(res, i) != get_typlen(atttypid)) + elog(ERROR, "Size of remote field \"%s\" does not match size " + "of local type \"%s\"", + attname, + format_type_with_typemod(atttypid, atttypmod)); + + attdim = 0; + attisset = false; + + TupleDescInitEntry(desc, attnum, attname, atttypid, + atttypmod, attdim, attisset); + } + + return desc; +} diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 4d53005ac6..ddca6241c4 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -3,7 +3,9 @@ * * Functions returning results from a remote database * - * Copyright (c) Joseph Conway , 2001, 2002, + * Joe Conway + * + * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -28,38 +30,6 @@ #ifndef DBLINK_H #define DBLINK_H -#include -#include "postgres.h" -#include "libpq-fe.h" -#include "libpq-int.h" -#include "fmgr.h" -#include "access/tupdesc.h" -#include "access/heapam.h" -#include "catalog/catname.h" -#include "catalog/pg_index.h" -#include "catalog/pg_type.h" -#include "executor/executor.h" -#include "executor/spi.h" -#include "lib/stringinfo.h" -#include "nodes/nodes.h" -#include "nodes/execnodes.h" -#include "nodes/pg_list.h" -#include "parser/parse_type.h" -#include "tcop/tcopprot.h" -#include "utils/builtins.h" -#include "utils/fmgroids.h" -#include "utils/array.h" -#include "utils/syscache.h" - -#ifdef NamespaceRelationName -#include "catalog/namespace.h" -#endif /* NamespaceRelationName */ - -/* - * Max SQL statement size - */ -#define DBLINK_MAX_SQLSTATE_SIZE 16384 - /* * This struct holds the results of the remote query. * Use fn_extra to hold a pointer to it across calls @@ -82,43 +52,27 @@ typedef struct PGresult *res; } dblink_results; - -/* - * This struct holds results in the form of an array. - * Use fn_extra to hold a pointer to it across calls - */ -typedef struct -{ - /* - * elem being accessed - */ - int elem_num; - - /* - * number of elems - */ - int num_elems; - - /* - * the actual array - */ - void *res; - -} dblink_array_results; - /* * External declarations */ +/* deprecated */ extern Datum dblink(PG_FUNCTION_ARGS); extern Datum dblink_tok(PG_FUNCTION_ARGS); -extern Datum dblink_strtok(PG_FUNCTION_ARGS); + +/* supported */ +extern Datum dblink_connect(PG_FUNCTION_ARGS); +extern Datum dblink_disconnect(PG_FUNCTION_ARGS); +extern Datum dblink_open(PG_FUNCTION_ARGS); +extern Datum dblink_close(PG_FUNCTION_ARGS); +extern Datum dblink_fetch(PG_FUNCTION_ARGS); +extern Datum dblink_record(PG_FUNCTION_ARGS); +extern Datum dblink_exec(PG_FUNCTION_ARGS); extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); extern Datum dblink_last_oid(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); extern Datum dblink_current_query(PG_FUNCTION_ARGS); -extern Datum dblink_replace_text(PG_FUNCTION_ARGS); extern char *debug_query_string; diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in index bea4378907..b92801a5c5 100644 --- a/contrib/dblink/dblink.sql.in +++ b/contrib/dblink/dblink.sql.in @@ -1,23 +1,60 @@ -CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int - AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' +-- Uncomment the following 9 lines to use original DEPRECATED functions +--CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int +-- AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' +-- WITH (isstrict); +--CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text +-- AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' +-- WITH (isstrict); +--CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid +-- AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' +-- WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_connect (text) RETURNS text + AS 'MODULE_PATHNAME','dblink_connect' LANGUAGE 'c' WITH (isstrict); -CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text - AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' +CREATE OR REPLACE FUNCTION dblink_disconnect () RETURNS text + AS 'MODULE_PATHNAME','dblink_disconnect' LANGUAGE 'c' WITH (isstrict); -CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text - AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' - WITH (iscachable, isstrict); +CREATE OR REPLACE FUNCTION dblink_open (text,text) RETURNS text + AS 'MODULE_PATHNAME','dblink_open' LANGUAGE 'c' + WITH (isstrict); -CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof text +CREATE OR REPLACE FUNCTION dblink_fetch (text,int) RETURNS setof record + AS 'MODULE_PATHNAME','dblink_fetch' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_close (text) RETURNS text + AS 'MODULE_PATHNAME','dblink_close' LANGUAGE 'c' + WITH (isstrict); + +-- Note: if this is a first time install of dblink, the following DROP +-- FUNCTION line is expected to fail. +-- Comment out the following 4 lines if the DEPRECATED functions are used. +DROP FUNCTION dblink (text,text); +CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof record + AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink (text) RETURNS setof record + AS 'MODULE_PATHNAME','dblink_record' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_exec (text,text) RETURNS text + AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c' + WITH (isstrict); + +CREATE OR REPLACE FUNCTION dblink_exec (text) RETURNS text + AS 'MODULE_PATHNAME','dblink_exec' LANGUAGE 'c' + WITH (isstrict); + +CREATE TYPE dblink_pkey_results AS (position int4, colname text); + +CREATE OR REPLACE FUNCTION dblink_get_pkey (text) RETURNS setof dblink_pkey_results AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' WITH (isstrict); -CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid - AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' - WITH (isstrict); - CREATE OR REPLACE FUNCTION dblink_build_sql_insert (text, int2vector, int2, _text, _text) RETURNS text AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c' WITH (isstrict); @@ -32,7 +69,3 @@ CREATE OR REPLACE FUNCTION dblink_build_sql_update (text, int2vector, int2, _tex CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c'; - -CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text - AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c' - WITH (iscachable, isstrict);