/* * dblink.c * * Functions returning results from a remote database * * Joe Conway * * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph and the following two paragraphs appear in all copies. * * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * */ #include "postgres.h" #include "libpq-fe.h" #include "fmgr.h" #include "funcapi.h" #include "access/tupdesc.h" #include "access/heapam.h" #include "catalog/catname.h" #include "catalog/namespace.h" #include "catalog/pg_index.h" #include "catalog/pg_type.h" #include "executor/executor.h" #include "executor/spi.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" #include "nodes/execnodes.h" #include "nodes/pg_list.h" #include "parser/parse_type.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/array.h" #include "utils/lsyscache.h" #include "utils/syscache.h" #include "dblink.h" /* * Internal declarations */ static dblink_results *init_dblink_results(MemoryContext fn_mcxt); static char **get_pkey_attnames(Oid relid, int16 *numatts); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); static char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *quote_literal_cstr(char *rawstr); static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); static dblink_results *get_res_ptr(int32 res_id_index); static void append_res_ptr(dblink_results * results); static void remove_res_ptr(dblink_results * results); static TupleDesc pgresultGetTupleDesc(PGresult *res); /* Global */ List *res_id = NIL; int res_id_index = 0; PGconn *persistent_conn = NULL; #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) #define xpfree(var_) \ do { \ if (var_ != NULL) \ { \ pfree(var_); \ var_ = NULL; \ } \ } while (0) /* * Create a persistent connection to another database */ PG_FUNCTION_INFO_V1(dblink_connect); Datum dblink_connect(PG_FUNCTION_ARGS) { char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); char *msg; text *result_text; MemoryContext oldcontext; if (persistent_conn != NULL) PQfinish(persistent_conn); oldcontext = MemoryContextSwitchTo(TopMemoryContext); persistent_conn = PQconnectdb(connstr); MemoryContextSwitchTo(oldcontext); if (PQstatus(persistent_conn) == CONNECTION_BAD) { msg = pstrdup(PQerrorMessage(persistent_conn)); PQfinish(persistent_conn); persistent_conn = NULL; elog(ERROR, "dblink_connect: connection error: %s", msg); } result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(result_text); } /* * Clear a persistent connection to another database */ PG_FUNCTION_INFO_V1(dblink_disconnect); Datum dblink_disconnect(PG_FUNCTION_ARGS) { text *result_text; if (persistent_conn != NULL) PQfinish(persistent_conn); persistent_conn = NULL; result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(result_text); } /* * opens a cursor using a persistent connection */ PG_FUNCTION_INFO_V1(dblink_open); Datum dblink_open(PG_FUNCTION_ARGS) { char *msg; PGresult *res = NULL; PGconn *conn = NULL; text *result_text; char *curname = GET_STR(PG_GETARG_TEXT_P(0)); char *sql = GET_STR(PG_GETARG_TEXT_P(1)); StringInfo str = makeStringInfo(); if (persistent_conn != NULL) conn = persistent_conn; else elog(ERROR, "dblink_open: no connection available"); res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(conn); persistent_conn = NULL; elog(ERROR, "dblink_open: begin error: %s", msg); } PQclear(res); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", quote_ident_cstr(curname), sql); res = PQexec(conn, str->data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(conn); persistent_conn = NULL; elog(ERROR, "dblink: sql error: %s", msg); } result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(result_text); } /* * closes a cursor */ PG_FUNCTION_INFO_V1(dblink_close); Datum dblink_close(PG_FUNCTION_ARGS) { PGconn *conn = NULL; PGresult *res = NULL; char *curname = GET_STR(PG_GETARG_TEXT_P(0)); StringInfo str = makeStringInfo(); text *result_text; char *msg; if (persistent_conn != NULL) conn = persistent_conn; else elog(ERROR, "dblink_close: no connection available"); appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname)); /* close the cursor */ res = PQexec(conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(persistent_conn); persistent_conn = NULL; elog(ERROR, "dblink_close: sql error: %s", msg); } PQclear(res); /* commit the transaction */ res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(persistent_conn); persistent_conn = NULL; elog(ERROR, "dblink_close: commit error: %s", msg); } PQclear(res); result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); PG_RETURN_TEXT_P(result_text); } /* * Fetch results from an open cursor */ PG_FUNCTION_INFO_V1(dblink_fetch); Datum dblink_fetch(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; TupleDesc tupdesc = NULL; int call_cntr; int max_calls; TupleTableSlot *slot; AttInMetadata *attinmeta; char *msg; PGresult *res = NULL; MemoryContext oldcontext; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { Oid functypeid; char functyptype; Oid funcid = fcinfo->flinfo->fn_oid; PGconn *conn = NULL; StringInfo str = makeStringInfo(); char *curname = GET_STR(PG_GETARG_TEXT_P(0)); int howmany = PG_GETARG_INT32(1); /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* * switch to memory context appropriate for multiple function * calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); if (persistent_conn != NULL) conn = persistent_conn; else elog(ERROR, "dblink_fetch: no connection available"); appendStringInfo(str, "FETCH %d FROM %s", howmany, quote_ident_cstr(curname)); res = PQexec(conn, str->data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(persistent_conn); persistent_conn = NULL; elog(ERROR, "dblink_fetch: sql error: %s", msg); } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* cursor does not exist - closed already or bad name */ PQclear(res); elog(ERROR, "dblink_fetch: cursor %s does not exist", quote_ident_cstr(curname)); } funcctx->max_calls = PQntuples(res); /* got results, keep track of them */ funcctx->user_fctx = res; /* fast track when no results */ if (funcctx->max_calls < 1) SRF_RETURN_DONE(funcctx); /* check typtype to see if we have a predetermined return type */ functypeid = get_func_rettype(funcid); functyptype = get_typtype(functypeid); if (functyptype == 'c') tupdesc = TypeGetTupleDesc(functypeid, NIL); else if (functyptype == 'p' && functypeid == RECORDOID) tupdesc = pgresultGetTupleDesc(res); else if (functyptype == 'b') elog(ERROR, "dblink_fetch: invalid kind of return type specified for function"); else elog(ERROR, "dblink_fetch: unknown kind of return type specified for function"); /* store needed metadata for subsequent calls */ slot = TupleDescGetSlot(tupdesc); funcctx->slot = slot; attinmeta = TupleDescGetAttInMetadata(tupdesc); funcctx->attinmeta = attinmeta; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); /* * initialize per-call variables */ call_cntr = funcctx->call_cntr; max_calls = funcctx->max_calls; slot = funcctx->slot; res = (PGresult *) funcctx->user_fctx; attinmeta = funcctx->attinmeta; tupdesc = attinmeta->tupdesc; if (call_cntr < max_calls) /* do when there is more left to send */ { char **values; HeapTuple tuple; Datum result; int i; int nfields = PQnfields(res); values = (char **) palloc(nfields * sizeof(char *)); for (i = 0; i < nfields; i++) { if (PQgetisnull(res, call_cntr, i) == 0) values[i] = PQgetvalue(res, call_cntr, i); else values[i] = NULL; } /* build the tuple */ tuple = BuildTupleFromCStrings(attinmeta, values); /* make the tuple into a datum */ result = TupleGetDatum(slot, tuple); SRF_RETURN_NEXT(funcctx, result); } else /* do when there is no more left */ { PQclear(res); SRF_RETURN_DONE(funcctx); } } /* * Note: this is the new preferred version of dblink */ PG_FUNCTION_INFO_V1(dblink_record); Datum dblink_record(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; TupleDesc tupdesc = NULL; int call_cntr; int max_calls; TupleTableSlot *slot; AttInMetadata *attinmeta; char *msg; PGresult *res = NULL; bool is_sql_cmd = false; char *sql_cmd_status = NULL; MemoryContext oldcontext; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { Oid functypeid; char functyptype; Oid funcid = fcinfo->flinfo->fn_oid; PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* * switch to memory context appropriate for multiple function * calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); if (fcinfo->nargs == 2) { connstr = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); conn = PQconnectdb(connstr); if (PQstatus(conn) == CONNECTION_BAD) { msg = pstrdup(PQerrorMessage(conn)); PQfinish(conn); elog(ERROR, "dblink: connection error: %s", msg); } } else if (fcinfo->nargs == 1) { sql = GET_STR(PG_GETARG_TEXT_P(0)); if (persistent_conn != NULL) conn = persistent_conn; else elog(ERROR, "dblink: no connection available"); } else elog(ERROR, "dblink: wrong number of arguments"); res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(conn); if (fcinfo->nargs == 1) persistent_conn = NULL; elog(ERROR, "dblink: sql error: %s", msg); } else { if (PQresultStatus(res) == PGRES_COMMAND_OK) { is_sql_cmd = true; /* need a tuple descriptor representing one TEXT column */ tupdesc = CreateTemplateTupleDesc(1, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", TEXTOID, -1, 0, false); /* * and save a copy of the command status string to return * as our result tuple */ sql_cmd_status = PQcmdStatus(res); funcctx->max_calls = 1; } else funcctx->max_calls = PQntuples(res); /* got results, keep track of them */ funcctx->user_fctx = res; /* if needed, close the connection to the database and cleanup */ if (fcinfo->nargs == 2) PQfinish(conn); } /* fast track when no results */ if (funcctx->max_calls < 1) SRF_RETURN_DONE(funcctx); /* check typtype to see if we have a predetermined return type */ functypeid = get_func_rettype(funcid); functyptype = get_typtype(functypeid); if (!is_sql_cmd) { if (functyptype == 'c') tupdesc = TypeGetTupleDesc(functypeid, NIL); else if (functyptype == 'p' && functypeid == RECORDOID) tupdesc = pgresultGetTupleDesc(res); else if (functyptype == 'b') elog(ERROR, "Invalid kind of return type specified for function"); else elog(ERROR, "Unknown kind of return type specified for function"); } /* store needed metadata for subsequent calls */ slot = TupleDescGetSlot(tupdesc); funcctx->slot = slot; attinmeta = TupleDescGetAttInMetadata(tupdesc); funcctx->attinmeta = attinmeta; MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); /* * initialize per-call variables */ call_cntr = funcctx->call_cntr; max_calls = funcctx->max_calls; slot = funcctx->slot; res = (PGresult *) funcctx->user_fctx; attinmeta = funcctx->attinmeta; tupdesc = attinmeta->tupdesc; if (call_cntr < max_calls) /* do when there is more left to send */ { char **values; HeapTuple tuple; Datum result; if (!is_sql_cmd) { int i; int nfields = PQnfields(res); values = (char **) palloc(nfields * sizeof(char *)); for (i = 0; i < nfields; i++) { if (PQgetisnull(res, call_cntr, i) == 0) values[i] = PQgetvalue(res, call_cntr, i); else values[i] = NULL; } } else { values = (char **) palloc(1 * sizeof(char *)); values[0] = sql_cmd_status; } /* build the tuple */ tuple = BuildTupleFromCStrings(attinmeta, values); /* make the tuple into a datum */ result = TupleGetDatum(slot, tuple); SRF_RETURN_NEXT(funcctx, result); } else /* do when there is no more left */ { PQclear(res); SRF_RETURN_DONE(funcctx); } } /* * Execute an SQL non-SELECT command */ PG_FUNCTION_INFO_V1(dblink_exec); Datum dblink_exec(PG_FUNCTION_ARGS) { char *msg; PGresult *res = NULL; char *sql_cmd_status = NULL; TupleDesc tupdesc = NULL; text *result_text; PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; if (fcinfo->nargs == 2) { connstr = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); conn = PQconnectdb(connstr); if (PQstatus(conn) == CONNECTION_BAD) { msg = pstrdup(PQerrorMessage(conn)); PQfinish(conn); elog(ERROR, "dblink_exec: connection error: %s", msg); } } else if (fcinfo->nargs == 1) { sql = GET_STR(PG_GETARG_TEXT_P(0)); if (persistent_conn != NULL) conn = persistent_conn; else elog(ERROR, "dblink_exec: no connection available"); } else elog(ERROR, "dblink_exec: wrong number of arguments"); res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(conn); if (fcinfo->nargs == 1) persistent_conn = NULL; elog(ERROR, "dblink_exec: sql error: %s", msg); } else { if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* need a tuple descriptor representing one TEXT column */ tupdesc = CreateTemplateTupleDesc(1, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", TEXTOID, -1, 0, false); /* * and save a copy of the command status string to return as * our result tuple */ sql_cmd_status = PQcmdStatus(res); } else elog(ERROR, "dblink_exec: queries returning results not allowed"); } PQclear(res); /* if needed, close the connection to the database and cleanup */ if (fcinfo->nargs == 2) PQfinish(conn); result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); PG_RETURN_TEXT_P(result_text); } /* * Note: this original version of dblink is DEPRECATED; * it *will* be removed in favor of the new version on next release */ PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) { PGconn *conn = NULL; PGresult *res = NULL; dblink_results *results; char *optstr; char *sqlstatement; char *execstatement; char *msg; int ntuples = 0; ReturnSetInfo *rsi; if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) elog(ERROR, "dblink: function called in context that does not accept a set result"); optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); if (fcinfo->flinfo->fn_extra == NULL) { conn = PQconnectdb(optstr); if (PQstatus(conn) == CONNECTION_BAD) { msg = pstrdup(PQerrorMessage(conn)); PQfinish(conn); elog(ERROR, "dblink: connection error: %s", msg); } execstatement = (char *) palloc(strlen(sqlstatement) + 1); if (execstatement != NULL) { strcpy(execstatement, sqlstatement); strcat(execstatement, "\0"); } else elog(ERROR, "dblink: insufficient memory"); res = PQexec(conn, execstatement); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { msg = pstrdup(PQerrorMessage(conn)); PQclear(res); PQfinish(conn); elog(ERROR, "dblink: sql error: %s", msg); } else { /* * got results, start fetching them */ ntuples = PQntuples(res); /* * increment resource index */ res_id_index++; results = init_dblink_results(fcinfo->flinfo->fn_mcxt); results->tup_num = 0; results->res_id_index = res_id_index; results->res = res; /* * Append node to res_id to hold pointer to results. Needed by * dblink_tok to access the data */ append_res_ptr(results); /* * save pointer to results for the next function manager call */ fcinfo->flinfo->fn_extra = (void *) results; /* close the connection to the database and cleanup */ PQfinish(conn); rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprMultipleResult; PG_RETURN_INT32(res_id_index); } } else { /* * check for more results */ results = fcinfo->flinfo->fn_extra; results->tup_num++; res_id_index = results->res_id_index; ntuples = PQntuples(results->res); if (results->tup_num < ntuples) { /* * fetch them if available */ rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprMultipleResult; PG_RETURN_INT32(res_id_index); } else { /* * or if no more, clean things up */ results = fcinfo->flinfo->fn_extra; remove_res_ptr(results); PQclear(results->res); pfree(results); fcinfo->flinfo->fn_extra = NULL; rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprEndResult; PG_RETURN_NULL(); } } PG_RETURN_NULL(); } /* * Note: dblink_tok is DEPRECATED; * it *will* be removed in favor of the new version on next release * * dblink_tok * parse dblink output string * return fldnum item (0 based) * based on provided field separator */ PG_FUNCTION_INFO_V1(dblink_tok); Datum dblink_tok(PG_FUNCTION_ARGS) { dblink_results *results; int fldnum; text *result_text; char *result; int nfields = 0; int text_len = 0; results = get_res_ptr(PG_GETARG_INT32(0)); if (results == NULL) { if (res_id != NIL) { freeList(res_id); res_id = NIL; res_id_index = 0; } elog(ERROR, "dblink_tok: function called with invalid resource id"); } fldnum = PG_GETARG_INT32(1); if (fldnum < 0) elog(ERROR, "dblink_tok: field number < 0 not permitted"); nfields = PQnfields(results->res); if (fldnum > (nfields - 1)) elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) PG_RETURN_NULL(); else { text_len = PQgetlength(results->res, results->tup_num, fldnum); result = (char *) palloc(text_len + 1); if (result != NULL) { strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum)); strcat(result, "\0"); } else elog(ERROR, "dblink: insufficient memory"); result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); PG_RETURN_TEXT_P(result_text); } } /* * dblink_get_pkey * * Return list of primary key fields for the supplied relation, * or NULL if none exists. */ PG_FUNCTION_INFO_V1(dblink_get_pkey); Datum dblink_get_pkey(PG_FUNCTION_ARGS) { int16 numatts; Oid relid; char **results; FuncCallContext *funcctx; int32 call_cntr; int32 max_calls; TupleTableSlot *slot; AttInMetadata *attinmeta; MemoryContext oldcontext; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc = NULL; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); /* * switch to memory context appropriate for multiple function * calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* convert relname to rel Oid */ relid = get_relid_from_relname(PG_GETARG_TEXT_P(0)); if (!OidIsValid(relid)) elog(ERROR, "dblink_get_pkey: relation does not exist"); /* * need a tuple descriptor representing one INT and one TEXT * column */ tupdesc = CreateTemplateTupleDesc(2, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position", INT4OID, -1, 0, false); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname", TEXTOID, -1, 0, false); /* allocate a slot for a tuple with this tupdesc */ slot = TupleDescGetSlot(tupdesc); /* assign slot to function context */ funcctx->slot = slot; /* * Generate attribute metadata needed later to produce tuples from * raw C strings */ attinmeta = TupleDescGetAttInMetadata(tupdesc); funcctx->attinmeta = attinmeta; /* get an array of attnums */ results = get_pkey_attnames(relid, &numatts); if ((results != NULL) && (numatts > 0)) { funcctx->max_calls = numatts; /* got results, keep track of them */ funcctx->user_fctx = results; } else /* fast track when no results */ SRF_RETURN_DONE(funcctx); MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); /* * initialize per-call variables */ call_cntr = funcctx->call_cntr; max_calls = funcctx->max_calls; slot = funcctx->slot; results = (char **) funcctx->user_fctx; attinmeta = funcctx->attinmeta; if (call_cntr < max_calls) /* do when there is more left to send */ { char **values; HeapTuple tuple; Datum result; values = (char **) palloc(2 * sizeof(char *)); values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */ sprintf(values[0], "%d", call_cntr + 1); values[1] = results[call_cntr]; /* build the tuple */ tuple = BuildTupleFromCStrings(attinmeta, values); /* make the tuple into a datum */ result = TupleGetDatum(slot, tuple); SRF_RETURN_NEXT(funcctx, result); } else /* do when there is no more left */ SRF_RETURN_DONE(funcctx); } /* * Note: dblink_last_oid is DEPRECATED; * it *will* be removed on next release * * dblink_last_oid * return last inserted oid */ PG_FUNCTION_INFO_V1(dblink_last_oid); Datum dblink_last_oid(PG_FUNCTION_ARGS) { dblink_results *results; results = get_res_ptr(PG_GETARG_INT32(0)); if (results == NULL) { if (res_id != NIL) { freeList(res_id); res_id = NIL; res_id_index = 0; } elog(ERROR, "dblink_tok: function called with invalid resource id"); } PG_RETURN_OID(PQoidValue(results->res)); } /* * dblink_build_sql_insert * * Used to generate an SQL insert statement * based on an existing tuple in a local relation. * This is useful for selectively replicating data * to another server via dblink. * * API: * - name of local table of interest * - an int2vector of attnums which will be used * to identify the local tuple of interest * - number of attnums in pkattnums * - text array of key values which will be used * to identify the local tuple of interest * - text array of key values which will be used * to build the string for execution remotely. These are substituted * for their counterparts in src_pkattvals_arry */ PG_FUNCTION_INFO_V1(dblink_build_sql_insert); Datum dblink_build_sql_insert(PG_FUNCTION_ARGS) { Oid relid; text *relname_text; int16 *pkattnums; int16 pknumatts; char **src_pkattvals; char **tgt_pkattvals; ArrayType *src_pkattvals_arry; ArrayType *tgt_pkattvals_arry; int src_ndim; int *src_dim; int src_nitems; int tgt_ndim; int *tgt_dim; int tgt_nitems; int i; char *ptr; char *sql; text *sql_text; int16 typlen; bool typbyval; char typalign; relname_text = PG_GETARG_TEXT_P(0); /* * Convert relname to rel OID. */ relid = get_relid_from_relname(relname_text); if (!OidIsValid(relid)) elog(ERROR, "dblink_build_sql_insert: relation does not exist"); pkattnums = (int16 *) PG_GETARG_POINTER(1); pknumatts = PG_GETARG_INT16(2); /* * There should be at least one key attribute */ if (pknumatts == 0) elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); /* * Source array is made up of key values that will be used to locate * the tuple of interest from the local system. */ src_ndim = ARR_NDIM(src_pkattvals_arry); src_dim = ARR_DIMS(src_pkattvals_arry); src_nitems = ArrayGetNItems(src_ndim, src_dim); /* * There should be one source array key value for each key attnum */ if (src_nitems != pknumatts) elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); /* * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), &typlen, &typbyval, &typalign); src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); ptr = (char *) att_align(ptr, typalign); } /* * Target array is made up of key values that will be used to build * the SQL string for use on the remote system. */ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); tgt_dim = ARR_DIMS(tgt_pkattvals_arry); tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); /* * There should be one target array key value for each key attnum */ if (tgt_nitems != pknumatts) elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); /* * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), &typlen, &typbyval, &typalign); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); ptr = (char *) att_align(ptr, typalign); } /* * Prep work is finally done. Go get the SQL string. */ sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); /* * Make it into TEXT for return to the client */ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); /* * And send it */ PG_RETURN_TEXT_P(sql_text); } /* * dblink_build_sql_delete * * Used to generate an SQL delete statement. * This is useful for selectively replicating a * delete to another server via dblink. * * API: * - name of remote table of interest * - an int2vector of attnums which will be used * to identify the remote tuple of interest * - number of attnums in pkattnums * - text array of key values which will be used * to build the string for execution remotely. */ PG_FUNCTION_INFO_V1(dblink_build_sql_delete); Datum dblink_build_sql_delete(PG_FUNCTION_ARGS) { Oid relid; text *relname_text; int16 *pkattnums; int16 pknumatts; char **tgt_pkattvals; ArrayType *tgt_pkattvals_arry; int tgt_ndim; int *tgt_dim; int tgt_nitems; int i; char *ptr; char *sql; text *sql_text; int16 typlen; bool typbyval; char typalign; relname_text = PG_GETARG_TEXT_P(0); /* * Convert relname to rel OID. */ relid = get_relid_from_relname(relname_text); if (!OidIsValid(relid)) elog(ERROR, "dblink_build_sql_delete: relation does not exist"); pkattnums = (int16 *) PG_GETARG_POINTER(1); pknumatts = PG_GETARG_INT16(2); /* * There should be at least one key attribute */ if (pknumatts == 0) elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); /* * Target array is made up of key values that will be used to build * the SQL string for use on the remote system. */ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); tgt_dim = ARR_DIMS(tgt_pkattvals_arry); tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); /* * There should be one target array key value for each key attnum */ if (tgt_nitems != pknumatts) elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); /* * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), &typlen, &typbyval, &typalign); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); ptr = (char *) att_align(ptr, typalign); } /* * Prep work is finally done. Go get the SQL string. */ sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); /* * Make it into TEXT for return to the client */ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); /* * And send it */ PG_RETURN_TEXT_P(sql_text); } /* * dblink_build_sql_update * * Used to generate an SQL update statement * based on an existing tuple in a local relation. * This is useful for selectively replicating data * to another server via dblink. * * API: * - name of local table of interest * - an int2vector of attnums which will be used * to identify the local tuple of interest * - number of attnums in pkattnums * - text array of key values which will be used * to identify the local tuple of interest * - text array of key values which will be used * to build the string for execution remotely. These are substituted * for their counterparts in src_pkattvals_arry */ PG_FUNCTION_INFO_V1(dblink_build_sql_update); Datum dblink_build_sql_update(PG_FUNCTION_ARGS) { Oid relid; text *relname_text; int16 *pkattnums; int16 pknumatts; char **src_pkattvals; char **tgt_pkattvals; ArrayType *src_pkattvals_arry; ArrayType *tgt_pkattvals_arry; int src_ndim; int *src_dim; int src_nitems; int tgt_ndim; int *tgt_dim; int tgt_nitems; int i; char *ptr; char *sql; text *sql_text; int16 typlen; bool typbyval; char typalign; relname_text = PG_GETARG_TEXT_P(0); /* * Convert relname to rel OID. */ relid = get_relid_from_relname(relname_text); if (!OidIsValid(relid)) elog(ERROR, "dblink_build_sql_update: relation does not exist"); pkattnums = (int16 *) PG_GETARG_POINTER(1); pknumatts = PG_GETARG_INT16(2); /* * There should be one source array key values for each key attnum */ if (pknumatts == 0) elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); /* * Source array is made up of key values that will be used to locate * the tuple of interest from the local system. */ src_ndim = ARR_NDIM(src_pkattvals_arry); src_dim = ARR_DIMS(src_pkattvals_arry); src_nitems = ArrayGetNItems(src_ndim, src_dim); /* * There should be one source array key value for each key attnum */ if (src_nitems != pknumatts) elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); /* * get array of pointers to c-strings from the input source array */ Assert(ARR_ELEMTYPE(src_pkattvals_arry) == TEXTOID); get_typlenbyvalalign(ARR_ELEMTYPE(src_pkattvals_arry), &typlen, &typbyval, &typalign); src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(src_pkattvals_arry); for (i = 0; i < src_nitems; i++) { src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); ptr = (char *) att_align(ptr, typalign); } /* * Target array is made up of key values that will be used to build * the SQL string for use on the remote system. */ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); tgt_dim = ARR_DIMS(tgt_pkattvals_arry); tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); /* * There should be one target array key value for each key attnum */ if (tgt_nitems != pknumatts) elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); /* * get array of pointers to c-strings from the input target array */ Assert(ARR_ELEMTYPE(tgt_pkattvals_arry) == TEXTOID); get_typlenbyvalalign(ARR_ELEMTYPE(tgt_pkattvals_arry), &typlen, &typbyval, &typalign); tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); ptr = ARR_DATA_PTR(tgt_pkattvals_arry); for (i = 0; i < tgt_nitems; i++) { tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); ptr = att_addlength(ptr, typlen, PointerGetDatum(ptr)); ptr = (char *) att_align(ptr, typalign); } /* * Prep work is finally done. Go get the SQL string. */ sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); /* * Make it into TEXT for return to the client */ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); /* * And send it */ PG_RETURN_TEXT_P(sql_text); } /* * dblink_current_query * return the current query string * to allow its use in (among other things) * rewrite rules */ PG_FUNCTION_INFO_V1(dblink_current_query); Datum dblink_current_query(PG_FUNCTION_ARGS) { text *result_text; result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); PG_RETURN_TEXT_P(result_text); } /************************************************************* * internal functions */ /* * init_dblink_results * - create an empty dblink_results data structure */ static dblink_results * init_dblink_results(MemoryContext fn_mcxt) { MemoryContext oldcontext; dblink_results *retval; oldcontext = MemoryContextSwitchTo(fn_mcxt); retval = (dblink_results *) palloc(sizeof(dblink_results)); MemSet(retval, 0, sizeof(dblink_results)); retval->tup_num = -1; retval->res_id_index = -1; retval->res = NULL; MemoryContextSwitchTo(oldcontext); return retval; } /* * get_pkey_attnames * * Get the primary key attnames for the given relation. * Return NULL, and set numatts = 0, if no primary key exists. */ static char ** get_pkey_attnames(Oid relid, int16 *numatts) { Relation indexRelation; ScanKeyData entry; HeapScanDesc scan; HeapTuple indexTuple; int i; char **result = NULL; Relation rel; TupleDesc tupdesc; /* open relation using relid, get tupdesc */ rel = relation_open(relid, AccessShareLock); tupdesc = rel->rd_att; /* initialize numatts to 0 in case no primary key exists */ *numatts = 0; /* use relid to get all related indexes */ indexRelation = heap_openr(IndexRelationName, AccessShareLock); ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, F_OIDEQ, ObjectIdGetDatum(relid)); scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); /* we're only interested if it is the primary key */ if (index->indisprimary == TRUE) { i = 0; while (index->indkey[i++] != 0) (*numatts)++; if (*numatts > 0) { result = (char **) palloc(*numatts * sizeof(char *)); for (i = 0; i < *numatts; i++) result[i] = SPI_fname(tupdesc, index->indkey[i]); } break; } } heap_endscan(scan); heap_close(indexRelation, AccessShareLock); relation_close(rel, AccessShareLock); return result; } static char * get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { Relation rel; char *relname; HeapTuple tuple; TupleDesc tupdesc; int natts; StringInfo str = makeStringInfo(); char *sql; char *val; int16 key; int i; bool needComma; /* * Open relation using relid */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); tupdesc = rel->rd_att; natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); if (!tuple) elog(ERROR, "dblink_build_sql_insert: row not found"); appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); needComma = false; for (i = 0; i < natts; i++) { if (tupdesc->attrs[i]->attisdropped) continue; if (needComma) appendStringInfo(str, ","); appendStringInfo(str, "%s", quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname))); needComma = true; } appendStringInfo(str, ") VALUES("); /* * remember attvals are 1 based */ needComma = false; for (i = 0; i < natts; i++) { if (tupdesc->attrs[i]->attisdropped) continue; if (needComma) appendStringInfo(str, ","); if (tgt_pkattvals != NULL) key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); else key = -1; if (key > -1) val = pstrdup(tgt_pkattvals[key]); else val = SPI_getvalue(tuple, tupdesc, i + 1); if (val != NULL) { appendStringInfo(str, "%s", quote_literal_cstr(val)); pfree(val); } else appendStringInfo(str, "NULL"); needComma = true; } appendStringInfo(str, ")"); sql = pstrdup(str->data); pfree(str->data); pfree(str); relation_close(rel, AccessShareLock); return (sql); } static char * get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals) { Relation rel; char *relname; TupleDesc tupdesc; int natts; StringInfo str = makeStringInfo(); char *sql; char *val; int i; /* * Open relation using relid */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); tupdesc = rel->rd_att; natts = tupdesc->natts; appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname)); for (i = 0; i < pknumatts; i++) { int16 pkattnum = pkattnums[i]; if (i > 0) appendStringInfo(str, " AND "); appendStringInfo(str, "%s", quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname))); if (tgt_pkattvals != NULL) val = pstrdup(tgt_pkattvals[i]); else { elog(ERROR, "Target key array must not be NULL"); val = NULL; /* keep compiler quiet */ } if (val != NULL) { appendStringInfo(str, " = %s", quote_literal_cstr(val)); pfree(val); } else appendStringInfo(str, " IS NULL"); } sql = pstrdup(str->data); pfree(str->data); pfree(str); relation_close(rel, AccessShareLock); return (sql); } static char * get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) { Relation rel; char *relname; HeapTuple tuple; TupleDesc tupdesc; int natts; StringInfo str = makeStringInfo(); char *sql; char *val; int16 key; int i; bool needComma; /* * Open relation using relid */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); tupdesc = rel->rd_att; natts = tupdesc->natts; tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); if (!tuple) elog(ERROR, "dblink_build_sql_update: row not found"); appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); needComma = false; for (i = 0; i < natts; i++) { if (tupdesc->attrs[i]->attisdropped) continue; if (needComma) appendStringInfo(str, ", "); appendStringInfo(str, "%s = ", quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname))); if (tgt_pkattvals != NULL) key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); else key = -1; if (key > -1) val = pstrdup(tgt_pkattvals[key]); else val = SPI_getvalue(tuple, tupdesc, i + 1); if (val != NULL) { appendStringInfo(str, "%s", quote_literal_cstr(val)); pfree(val); } else appendStringInfo(str, "NULL"); needComma = true; } appendStringInfo(str, " WHERE "); for (i = 0; i < pknumatts; i++) { int16 pkattnum = pkattnums[i]; if (i > 0) appendStringInfo(str, " AND "); appendStringInfo(str, "%s", quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname))); if (tgt_pkattvals != NULL) val = pstrdup(tgt_pkattvals[i]); else val = SPI_getvalue(tuple, tupdesc, pkattnum); if (val != NULL) { appendStringInfo(str, " = %s", quote_literal_cstr(val)); pfree(val); } else appendStringInfo(str, " IS NULL"); } sql = pstrdup(str->data); pfree(str->data); pfree(str); relation_close(rel, AccessShareLock); return (sql); } /* * Return a properly quoted literal value. * Uses quote_literal in quote.c */ static char * quote_literal_cstr(char *rawstr) { text *rawstr_text; text *result_text; char *result; rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text))); result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); return result; } /* * Return a properly quoted identifier. * Uses quote_ident in quote.c */ static char * quote_ident_cstr(char *rawstr) { text *rawstr_text; text *result_text; char *result; rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text))); result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); return result; } static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) { int i; /* * Not likely a long list anyway, so just scan for the value */ for (i = 0; i < pknumatts; i++) if (key == pkattnums[i]) return i; return -1; } static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals) { Relation rel; char *relname; TupleDesc tupdesc; StringInfo str = makeStringInfo(); char *sql = NULL; int ret; HeapTuple tuple; int i; char *val = NULL; /* * Open relation using relid */ rel = relation_open(relid, AccessShareLock); relname = RelationGetRelationName(rel); tupdesc = CreateTupleDescCopy(rel->rd_att); relation_close(rel, AccessShareLock); /* * Connect to SPI manager */ if ((ret = SPI_connect()) < 0) elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret); /* * Build sql statement to look up tuple of interest Use src_pkattvals * as the criteria. */ appendStringInfo(str, "SELECT * FROM %s WHERE ", quote_ident_cstr(relname)); for (i = 0; i < pknumatts; i++) { int16 pkattnum = pkattnums[i]; if (i > 0) appendStringInfo(str, " AND "); appendStringInfo(str, "%s", quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname))); val = pstrdup(src_pkattvals[i]); if (val != NULL) { appendStringInfo(str, " = %s", quote_literal_cstr(val)); pfree(val); } else appendStringInfo(str, " IS NULL"); } sql = pstrdup(str->data); pfree(str->data); pfree(str); /* * Retrieve the desired tuple */ ret = SPI_exec(sql, 0); pfree(sql); /* * Only allow one qualifying tuple */ if ((ret == SPI_OK_SELECT) && (SPI_processed > 1)) elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record."); else if (ret == SPI_OK_SELECT && SPI_processed == 1) { SPITupleTable *tuptable = SPI_tuptable; tuple = SPI_copytuple(tuptable->vals[0]); return tuple; } else { /* * no qualifying tuples */ return NULL; } /* * never reached, but keep compiler quiet */ return NULL; } static Oid get_relid_from_relname(text *relname_text) { RangeVar *relvar; Relation rel; Oid relid; relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text, "get_relid_from_relname")); rel = heap_openrv(relvar, AccessShareLock); relid = RelationGetRelid(rel); relation_close(rel, AccessShareLock); return relid; } static dblink_results * get_res_ptr(int32 res_id_index) { List *ptr; /* * short circuit empty list */ if (res_id == NIL) return NULL; /* * OK, should be good to go */ foreach(ptr, res_id) { dblink_results *this_res_id = (dblink_results *) lfirst(ptr); if (this_res_id->res_id_index == res_id_index) return this_res_id; } return NULL; } /* * Add node to global List res_id */ static void append_res_ptr(dblink_results * results) { res_id = lappend(res_id, results); } /* * Remove node from global List * using res_id_index */ static void remove_res_ptr(dblink_results * results) { res_id = lremove(results, res_id); if (res_id == NIL) res_id_index = 0; } static TupleDesc pgresultGetTupleDesc(PGresult *res) { int natts; AttrNumber attnum; TupleDesc desc; char *attname; int32 atttypmod; int attdim; bool attisset; Oid atttypid; int i; /* * allocate a new tuple descriptor */ natts = PQnfields(res); if (natts < 1) elog(ERROR, "cannot create a description for empty results"); desc = CreateTemplateTupleDesc(natts, false); attnum = 0; for (i = 0; i < natts; i++) { /* * for each field, get the name and type information from the * query result and have TupleDescInitEntry fill in the attribute * information we need. */ attnum++; attname = PQfname(res, i); atttypid = PQftype(res, i); atttypmod = PQfmod(res, i); if (PQfsize(res, i) != get_typlen(atttypid)) elog(ERROR, "Size of remote field \"%s\" does not match size " "of local type \"%s\"", attname, format_type_with_typemod(atttypid, atttypmod)); attdim = 0; attisset = false; TupleDescInitEntry(desc, attnum, attname, atttypid, atttypmod, attdim, attisset); } return desc; }