postgres_fdw: Remove duplicate code in DML execution callback functions.

postgresExecForeignInsert(), postgresExecForeignUpdate(), and
postgresExecForeignDelete() are coded almost identically, except that
postgresExecForeignInsert() does not need CTID.  Extract that code into
a separate function and use it in all the three function implementations.

Author: Ashutosh Bapat
Reviewed-By: Michael Paquier
Discussion: https://postgr.es/m/CAFjFpRcz8yoY7cBTYofcrCLwjaDeCcGKyTUivUbRiA57y3v-bw%40mail.gmail.com
This commit is contained in:
Etsuro Fujita 2019-01-17 14:37:33 +09:00
parent d723f56872
commit 6c61d7c593
1 changed files with 103 additions and 180 deletions

View File

@ -391,6 +391,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
List *target_attrs,
bool has_returning,
List *retrieved_attrs);
static TupleTableSlot *execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
@ -1776,58 +1781,8 @@ postgresExecForeignInsert(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
const char **p_values;
PGresult *res;
int n_rows;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
n_rows = PQntuples(res);
if (n_rows > 0)
store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
/* And clean up */
PQclear(res);
MemoryContextReset(fmstate->temp_cxt);
/* Return NULL if nothing was inserted on the remote end */
return (n_rows > 0) ? slot : NULL;
return execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
slot, planSlot);
}
/*
@ -1840,70 +1795,8 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
Datum datum;
bool isNull;
const char **p_values;
PGresult *res;
int n_rows;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
fmstate->ctidAttno,
&isNull);
/* shouldn't ever get a null result... */
if (isNull)
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
(ItemPointer) DatumGetPointer(datum),
slot);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
n_rows = PQntuples(res);
if (n_rows > 0)
store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
/* And clean up */
PQclear(res);
MemoryContextReset(fmstate->temp_cxt);
/* Return NULL if nothing was updated on the remote end */
return (n_rows > 0) ? slot : NULL;
return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
slot, planSlot);
}
/*
@ -1916,70 +1809,8 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
Datum datum;
bool isNull;
const char **p_values;
PGresult *res;
int n_rows;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
fmstate->ctidAttno,
&isNull);
/* shouldn't ever get a null result... */
if (isNull)
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
(ItemPointer) DatumGetPointer(datum),
NULL);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
n_rows = PQntuples(res);
if (n_rows > 0)
store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
/* And clean up */
PQclear(res);
MemoryContextReset(fmstate->temp_cxt);
/* Return NULL if nothing was deleted on the remote end */
return (n_rows > 0) ? slot : NULL;
return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
slot, planSlot);
}
/*
@ -3425,6 +3256,98 @@ create_foreign_modify(EState *estate,
return fmstate;
}
/*
* execute_foreign_modify
* Perform foreign-table modification as required, and fetch RETURNING
* result if any. (This is the shared guts of postgresExecForeignInsert,
* postgresExecForeignUpdate, and postgresExecForeignDelete.)
*/
static TupleTableSlot *
execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
ItemPointer ctid = NULL;
const char **p_values;
PGresult *res;
int n_rows;
/* The operation should be INSERT, UPDATE, or DELETE */
Assert(operation == CMD_INSERT ||
operation == CMD_UPDATE ||
operation == CMD_DELETE);
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
/*
* For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
*/
if (operation == CMD_UPDATE || operation == CMD_DELETE)
{
Datum datum;
bool isNull;
datum = ExecGetJunkAttribute(planSlot,
fmstate->ctidAttno,
&isNull);
/* shouldn't ever get a null result... */
if (isNull)
elog(ERROR, "ctid is NULL");
ctid = (ItemPointer) DatumGetPointer(datum);
}
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate, ctid, slot);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
n_rows = PQntuples(res);
if (n_rows > 0)
store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
/* And clean up */
PQclear(res);
MemoryContextReset(fmstate->temp_cxt);
/*
* Return NULL if nothing was inserted/updated/deleted on the remote end
*/
return (n_rows > 0) ? slot : NULL;
}
/*
* prepare_foreign_modify
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE