postgresql/contrib/dblink/dblink.c
Tom Lane 5cabcfccce Modify array operations to include array's element type OID in the
array header, and to compute sizing and alignment of array elements
the same way normal tuple access operations do --- viz, using the
tupmacs.h macros att_addlength and att_align.  This makes the world
safe for arrays of cstrings or intervals, and should make it much
easier to write array-type-polymorphic functions; as examples see
the cleanups of array_out and contrib/array_iterator.  By Joe Conway
and Tom Lane.
2002-08-26 17:54:02 +00:00

1459 lines
34 KiB
C

/*
* dblink.c
*
* Functions returning results from a remote database
*
* Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#include "dblink.h"
/*
* Internal declarations
*/
static dblink_results *init_dblink_results(MemoryContext fn_mcxt);
static dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
static char **get_pkey_attnames(Oid relid, int16 *numatts);
static char *get_strtok(char *fldtext, char *fldsep, int fldnum);
static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *quote_literal_cstr(char *rawstr);
static char *quote_ident_cstr(char *rawstr);
static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
static dblink_results *get_res_ptr(int32 res_id_index);
static void append_res_ptr(dblink_results *results);
static void remove_res_ptr(dblink_results *results);
/* Global */
List *res_id = NIL;
int res_id_index = 0;
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
PGresult *res = NULL;
dblink_results *results;
char *optstr;
char *sqlstatement;
char *execstatement;
char *msg;
int ntuples = 0;
ReturnSetInfo *rsi;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
if (fcinfo->flinfo->fn_extra == NULL)
{
conn = PQconnectdb(optstr);
if (PQstatus(conn) == CONNECTION_BAD)
{
msg = pstrdup(PQerrorMessage(conn));
PQfinish(conn);
elog(ERROR, "dblink: connection error: %s", msg);
}
execstatement = (char *) palloc(strlen(sqlstatement) + 1);
if (execstatement != NULL)
{
strcpy(execstatement, sqlstatement);
strcat(execstatement, "\0");
}
else
elog(ERROR, "dblink: insufficient memory");
res = PQexec(conn, execstatement);
if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
{
msg = pstrdup(PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
elog(ERROR, "dblink: sql error: %s", msg);
}
else
{
/*
* got results, start fetching them
*/
ntuples = PQntuples(res);
/*
* increment resource index
*/
res_id_index++;
results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
results->tup_num = 0;
results->res_id_index = res_id_index;
results->res = res;
/*
* Append node to res_id to hold pointer to results.
* Needed by dblink_tok to access the data
*/
append_res_ptr(results);
/*
* save pointer to results for the next function manager call
*/
fcinfo->flinfo->fn_extra = (void *) results;
/* close the connection to the database and cleanup */
PQfinish(conn);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_INT32(res_id_index);
}
}
else
{
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
res_id_index = results->res_id_index;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples)
{
/*
* fetch them if available
*/
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
PG_RETURN_INT32(res_id_index);
}
else
{
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
remove_res_ptr(results);
PQclear(results->res);
pfree(results);
fcinfo->flinfo->fn_extra = NULL;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* dblink_tok
* parse dblink output string
* return fldnum item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_tok);
Datum
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
int fldnum;
text *result_text;
char *result;
int nfields = 0;
int text_len = 0;
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
{
if (res_id != NIL)
{
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
}
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0)
elog(ERROR, "dblink_tok: field number < 0 not permitted");
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1))
elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
PG_RETURN_NULL();
else
{
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
if (result != NULL)
{
strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
strcat(result, "\0");
}
else
elog(ERROR, "dblink: insufficient memory");
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
}
}
/*
* dblink_strtok
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
PG_FUNCTION_INFO_V1(dblink_strtok);
Datum
dblink_strtok(PG_FUNCTION_ARGS)
{
char *fldtext;
char *fldsep;
int fldnum;
char *buffer;
text *result_text;
fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
fldnum = PG_GETARG_INT32(2);
if (fldtext[0] == '\0')
{
elog(ERROR, "get_strtok: blank list not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
buffer = get_strtok(fldtext, fldsep, fldnum);
pfree(fldtext);
pfree(fldsep);
if (buffer == NULL)
{
PG_RETURN_NULL();
}
else
{
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
pfree(buffer);
PG_RETURN_TEXT_P(result_text);
}
}
/*
* dblink_get_pkey
*
* Return comma delimited list of primary key
* fields for the supplied relation,
* or NULL if none exists.
*/
PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
text *relname_text;
Oid relid;
char **result;
text *result_text;
int16 numatts;
ReturnSetInfo *rsi;
dblink_array_results *ret_set;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
if (fcinfo->flinfo->fn_extra == NULL)
{
relname_text = PG_GETARG_TEXT_P(0);
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid))
elog(ERROR, "dblink_get_pkey: relation does not exist");
/*
* get an array of attnums.
*/
result = get_pkey_attnames(relid, &numatts);
if ((result != NULL) && (numatts > 0))
{
ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
ret_set->elem_num = 0;
ret_set->num_elems = numatts;
ret_set->res = result;
fcinfo->flinfo->fn_extra = (void *) ret_set;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
PG_RETURN_TEXT_P(result_text);
}
else
{
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
else
{
/*
* check for more results
*/
ret_set = fcinfo->flinfo->fn_extra;
ret_set->elem_num++;
result = ret_set->res;
if (ret_set->elem_num < ret_set->num_elems)
{
/*
* fetch next one
*/
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
PG_RETURN_TEXT_P(result_text);
}
else
{
int i;
/*
* or if no more, clean things up
*/
for (i = 0; i < ret_set->num_elems; i++)
pfree(result[i]);
pfree(ret_set->res);
pfree(ret_set);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
PG_RETURN_NULL();
}
}
PG_RETURN_NULL();
}
/*
* dblink_last_oid
* return last inserted oid
*/
PG_FUNCTION_INFO_V1(dblink_last_oid);
Datum
dblink_last_oid(PG_FUNCTION_ARGS)
{
dblink_results *results;
results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
{
if (res_id != NIL)
{
freeList(res_id);
res_id = NIL;
res_id_index = 0;
}
elog(ERROR, "dblink_tok: function called with invalid resource id");
}
PG_RETURN_OID(PQoidValue(results->res));
}
/*
* dblink_build_sql_insert
*
* Used to generate an SQL insert statement
* based on an existing tuple in a local relation.
* This is useful for selectively replicating data
* to another server via dblink.
*
* API:
* <relname> - name of local table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the local tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <src_pkattvals_arry> - text array of key values which will be used
* to identify the local tuple of interest
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely. These are substituted
* for their counterparts in src_pkattvals_arry
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
Oid relid;
text *relname_text;
int16 *pkattnums;
int16 pknumatts;
char **src_pkattvals;
char **tgt_pkattvals;
ArrayType *src_pkattvals_arry;
ArrayType *tgt_pkattvals_arry;
int src_ndim;
int *src_dim;
int src_nitems;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname_text = PG_GETARG_TEXT_P(0);
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid))
elog(ERROR, "dblink_build_sql_insert: relation does not exist");
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be at least one key attribute
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
/*
* Source array is made up of key values that will be used to
* locate the tuple of interest from the local system.
*/
src_ndim = ARR_NDIM(src_pkattvals_arry);
src_dim = ARR_DIMS(src_pkattvals_arry);
src_nitems = ArrayGetNItems(src_ndim, src_dim);
/*
* There should be one source array key value for each key attnum
*/
if (src_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input source array
*/
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_build_sql_delete
*
* Used to generate an SQL delete statement.
* This is useful for selectively replicating a
* delete to another server via dblink.
*
* API:
* <relname> - name of remote table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the remote tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely.
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)
{
Oid relid;
text *relname_text;
int16 *pkattnums;
int16 pknumatts;
char **tgt_pkattvals;
ArrayType *tgt_pkattvals_arry;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname_text = PG_GETARG_TEXT_P(0);
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid))
elog(ERROR, "dblink_build_sql_delete: relation does not exist");
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be at least one key attribute
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_build_sql_update
*
* Used to generate an SQL update statement
* based on an existing tuple in a local relation.
* This is useful for selectively replicating data
* to another server via dblink.
*
* API:
* <relname> - name of local table of interest
* <pkattnums> - an int2vector of attnums which will be used
* to identify the local tuple of interest
* <pknumatts> - number of attnums in pkattnums
* <src_pkattvals_arry> - text array of key values which will be used
* to identify the local tuple of interest
* <tgt_pkattvals_arry> - text array of key values which will be used
* to build the string for execution remotely. These are substituted
* for their counterparts in src_pkattvals_arry
*/
PG_FUNCTION_INFO_V1(dblink_build_sql_update);
Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)
{
Oid relid;
text *relname_text;
int16 *pkattnums;
int16 pknumatts;
char **src_pkattvals;
char **tgt_pkattvals;
ArrayType *src_pkattvals_arry;
ArrayType *tgt_pkattvals_arry;
int src_ndim;
int *src_dim;
int src_nitems;
int tgt_ndim;
int *tgt_dim;
int tgt_nitems;
int i;
char *ptr;
char *sql;
text *sql_text;
relname_text = PG_GETARG_TEXT_P(0);
/*
* Convert relname to rel OID.
*/
relid = get_relid_from_relname(relname_text);
if (!OidIsValid(relid))
elog(ERROR, "dblink_build_sql_update: relation does not exist");
pkattnums = (int16 *) PG_GETARG_POINTER(1);
pknumatts = PG_GETARG_INT16(2);
/*
* There should be one source array key values for each key attnum
*/
if (pknumatts == 0)
elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
/*
* Source array is made up of key values that will be used to
* locate the tuple of interest from the local system.
*/
src_ndim = ARR_NDIM(src_pkattvals_arry);
src_dim = ARR_DIMS(src_pkattvals_arry);
src_nitems = ArrayGetNItems(src_ndim, src_dim);
/*
* There should be one source array key value for each key attnum
*/
if (src_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input source array
*/
Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID);
src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(src_pkattvals_arry);
for (i = 0; i < src_nitems; i++)
{
src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Target array is made up of key values that will be used to
* build the SQL string for use on the remote system.
*/
tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
/*
* There should be one target array key value for each key attnum
*/
if (tgt_nitems != pknumatts)
elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
/*
* get array of pointers to c-strings from the input target array
*/
Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID);
tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
for (i = 0; i < tgt_nitems; i++)
{
tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
ptr += INTALIGN(*(int32 *) ptr);
}
/*
* Prep work is finally done. Go get the SQL string.
*/
sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
/*
* Make it into TEXT for return to the client
*/
sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
/*
* And send it
*/
PG_RETURN_TEXT_P(sql_text);
}
/*
* dblink_current_query
* return the current query string
* to allow its use in (among other things)
* rewrite rules
*/
PG_FUNCTION_INFO_V1(dblink_current_query);
Datum
dblink_current_query(PG_FUNCTION_ARGS)
{
text *result_text;
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
PG_RETURN_TEXT_P(result_text);
}
/*
* dblink_replace_text
* replace all occurences of 'old_sub_str' in 'orig_str'
* with 'new_sub_str' to form 'new_str'
*
* returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
* otherwise returns 'new_str'
*/
PG_FUNCTION_INFO_V1(dblink_replace_text);
Datum
dblink_replace_text(PG_FUNCTION_ARGS)
{
text *left_text;
text *right_text;
text *buf_text;
text *ret_text;
char *ret_str;
int curr_posn;
text *src_text = PG_GETARG_TEXT_P(0);
int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
text *from_sub_text = PG_GETARG_TEXT_P(1);
int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
text *to_sub_text = PG_GETARG_TEXT_P(2);
char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
StringInfo str = makeStringInfo();
if (src_text_len == 0 || from_sub_text_len == 0)
PG_RETURN_TEXT_P(src_text);
buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
while (curr_posn > 0)
{
left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
appendStringInfo(str, "%s",
DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
appendStringInfo(str, "%s", to_sub_str);
pfree(buf_text);
pfree(left_text);
buf_text = right_text;
curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
}
appendStringInfo(str, "%s",
DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
pfree(buf_text);
ret_str = pstrdup(str->data);
ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
PG_RETURN_TEXT_P(ret_text);
}
/*************************************************************
* internal functions
*/
/*
* init_dblink_results
* - create an empty dblink_results data structure
*/
static dblink_results *
init_dblink_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_results *) palloc(sizeof(dblink_results));
MemSet(retval, 0, sizeof(dblink_results));
retval->tup_num = -1;
retval->res_id_index =-1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/*
* init_dblink_array_results
* - create an empty dblink_array_results data structure
*/
static dblink_array_results *
init_dblink_array_results(MemoryContext fn_mcxt)
{
MemoryContext oldcontext;
dblink_array_results *retval;
oldcontext = MemoryContextSwitchTo(fn_mcxt);
retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
MemSet(retval, 0, sizeof(dblink_array_results));
retval->elem_num = -1;
retval->num_elems = 0;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
/*
* get_pkey_attnames
*
* Get the primary key attnames for the given relation.
* Return NULL, and set numatts = 0, if no primary key exists.
*/
static char **
get_pkey_attnames(Oid relid, int16 *numatts)
{
Relation indexRelation;
ScanKeyData entry;
HeapScanDesc scan;
HeapTuple indexTuple;
int i;
char **result = NULL;
Relation rel;
TupleDesc tupdesc;
/*
* Open relation using relid, get tupdesc
*/
rel = relation_open(relid, AccessShareLock);
tupdesc = rel->rd_att;
/*
* Initialize numatts to 0 in case no primary key
* exists
*/
*numatts = 0;
/*
* Use relid to get all related indexes
*/
indexRelation = heap_openr(IndexRelationName, AccessShareLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
F_OIDEQ, ObjectIdGetDatum(relid));
scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
/*
* We're only interested if it is the primary key
*/
if (index->indisprimary == TRUE)
{
i = 0;
while (index->indkey[i++] != 0)
(*numatts)++;
if (*numatts > 0)
{
result = (char **) palloc(*numatts * sizeof(char *));
for (i = 0; i < *numatts; i++)
result[i] = SPI_fname(tupdesc, index->indkey[i]);
}
break;
}
}
heap_endscan(scan);
heap_close(indexRelation, AccessShareLock);
relation_close(rel, AccessShareLock);
return result;
}
/*
* get_strtok
*
* parse input string
* return ord item (0 based)
* based on provided field separator
*/
static char *
get_strtok(char *fldtext, char *fldsep, int fldnum)
{
int j = 0;
char *result;
if (fldnum < 0)
{
elog(ERROR, "get_strtok: field number < 0 not permitted");
}
if (fldsep[0] == '\0')
{
elog(ERROR, "get_strtok: blank field separator not permitted");
}
result = strtok(fldtext, fldsep);
for (j = 1; j < fldnum + 1; j++)
{
result = strtok(NULL, fldsep);
if (result == NULL)
return NULL;
}
return pstrdup(result);
}
static char *
get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql;
char *val;
int16 key;
int i;
bool needComma;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
needComma = false;
for (i = 0; i < natts; i++)
{
if (tupdesc->attrs[i]->attisdropped)
continue;
if (needComma)
appendStringInfo(str, ",");
appendStringInfo(str, "%s",
quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
needComma = true;
}
appendStringInfo(str, ") VALUES(");
/*
* remember attvals are 1 based
*/
needComma = false;
for (i = 0; i < natts; i++)
{
if (tupdesc->attrs[i]->attisdropped)
continue;
if (needComma)
appendStringInfo(str, ",");
if (tgt_pkattvals != NULL)
key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
else
key = -1;
if (key > -1)
val = pstrdup(tgt_pkattvals[key]);
else
val = SPI_getvalue(tuple, tupdesc, i + 1);
if (val != NULL)
{
appendStringInfo(str, "%s", quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "NULL");
needComma = true;
}
appendStringInfo(str, ")");
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
static char *
get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
{
Relation rel;
char *relname;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql;
char *val;
int i;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, "%s",
quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
if (tgt_pkattvals != NULL)
val = pstrdup(tgt_pkattvals[i]);
else
{
elog(ERROR, "Target key array must not be NULL");
val = NULL; /* keep compiler quiet */
}
if (val != NULL)
{
appendStringInfo(str, " = %s", quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, " IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
static char *
get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
{
Relation rel;
char *relname;
HeapTuple tuple;
TupleDesc tupdesc;
int natts;
StringInfo str = makeStringInfo();
char *sql;
char *val;
int16 key;
int i;
bool needComma;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
natts = tupdesc->natts;
tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
needComma = false;
for (i = 0; i < natts; i++)
{
if (tupdesc->attrs[i]->attisdropped)
continue;
if (needComma)
appendStringInfo(str, ", ");
appendStringInfo(str, "%s = ",
quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
if (tgt_pkattvals != NULL)
key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
else
key = -1;
if (key > -1)
val = pstrdup(tgt_pkattvals[key]);
else
val = SPI_getvalue(tuple, tupdesc, i + 1);
if (val != NULL)
{
appendStringInfo(str, "%s", quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, "NULL");
needComma = true;
}
appendStringInfo(str, " WHERE ");
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, "%s",
quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
if (tgt_pkattvals != NULL)
val = pstrdup(tgt_pkattvals[i]);
else
val = SPI_getvalue(tuple, tupdesc, pkattnum);
if (val != NULL)
{
appendStringInfo(str, " = %s", quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, " IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
relation_close(rel, AccessShareLock);
return (sql);
}
/*
* Return a properly quoted literal value.
* Uses quote_literal in quote.c
*/
static char *
quote_literal_cstr(char *rawstr)
{
text *rawstr_text;
text *result_text;
char *result;
rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
return result;
}
/*
* Return a properly quoted identifier.
* Uses quote_ident in quote.c
*/
static char *
quote_ident_cstr(char *rawstr)
{
text *rawstr_text;
text *result_text;
char *result;
rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
return result;
}
static int16
get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
{
int i;
/*
* Not likely a long list anyway, so just scan for
* the value
*/
for (i = 0; i < pknumatts; i++)
if (key == pkattnums[i])
return i;
return -1;
}
static HeapTuple
get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
{
Relation rel;
char *relname;
TupleDesc tupdesc;
StringInfo str = makeStringInfo();
char *sql = NULL;
int ret;
HeapTuple tuple;
int i;
char *val = NULL;
/*
* Open relation using relid
*/
rel = relation_open(relid, AccessShareLock);
relname = RelationGetRelationName(rel);
tupdesc = rel->rd_att;
/*
* Connect to SPI manager
*/
if ((ret = SPI_connect()) < 0)
elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
/*
* Build sql statement to look up tuple of interest
* Use src_pkattvals as the criteria.
*/
appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname));
for (i = 0; i < pknumatts; i++)
{
int16 pkattnum = pkattnums[i];
if (i > 0)
appendStringInfo(str, " AND ");
appendStringInfo(str, "%s",
quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
val = pstrdup(src_pkattvals[i]);
if (val != NULL)
{
appendStringInfo(str, " = %s", quote_literal_cstr(val));
pfree(val);
}
else
appendStringInfo(str, " IS NULL");
}
sql = pstrdup(str->data);
pfree(str->data);
pfree(str);
/*
* Retrieve the desired tuple
*/
ret = SPI_exec(sql, 0);
pfree(sql);
/*
* Only allow one qualifying tuple
*/
if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
{
elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
}
else if (ret == SPI_OK_SELECT && SPI_processed == 1)
{
SPITupleTable *tuptable = SPI_tuptable;
tuple = SPI_copytuple(tuptable->vals[0]);
return tuple;
}
else
{
/*
* no qualifying tuples
*/
return NULL;
}
/*
* never reached, but keep compiler quiet
*/
return NULL;
}
static Oid
get_relid_from_relname(text *relname_text)
{
#ifdef NamespaceRelationName
RangeVar *relvar;
Relation rel;
Oid relid;
relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_relid_from_relname"));
rel = heap_openrv(relvar, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
#else
char *relname;
Relation rel;
Oid relid;
relname = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(relname_text)));
rel = relation_openr(relname, AccessShareLock);
relid = RelationGetRelid(rel);
relation_close(rel, AccessShareLock);
#endif /* NamespaceRelationName */
return relid;
}
static dblink_results *
get_res_ptr(int32 res_id_index)
{
List *ptr;
/*
* short circuit empty list
*/
if(res_id == NIL)
return NULL;
/*
* OK, should be good to go
*/
foreach(ptr, res_id)
{
dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
if (this_res_id->res_id_index == res_id_index)
return this_res_id;
}
return NULL;
}
/*
* Add node to global List res_id
*/
static void
append_res_ptr(dblink_results *results)
{
res_id = lappend(res_id, results);
}
/*
* Remove node from global List
* using res_id_index
*/
static void
remove_res_ptr(dblink_results *results)
{
res_id = lremove(results, res_id);
if (res_id == NIL)
res_id_index = 0;
}