postgres_fdw: Refactor transaction rollback code to avoid code duplication.

In postgres_fdw, pgfdw_xact_callback() and pgfdw_subxact_callback()
callback functions do almost the same thing to rollback remote toplevel-
and sub-transaction. But previously their such rollback logics were
implemented separately in each function and in different way. Which
could decrease the readability and maintainability of the code.

To fix the issue, this commit creates the common function to rollback
remote transactions, and makes those callback functions use it. Which
allows us to avoid unnecessary code duplication.

Author: Fujii Masao
Reviewed-by: Zhihong Yu, Bharath Rupireddy
Discussion: https://postgr.es/m/62fbb63a-d46c-fb47-a56d-f6be1909aa44@oss.nttdata.com
This commit is contained in:
Fujii Masao 2021-09-22 23:47:36 +09:00
parent f9ea296031
commit 85c6961128
1 changed files with 75 additions and 102 deletions

View File

@ -105,6 +105,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@ -872,8 +874,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* If it has an open remote transaction, try to close it */
if (entry->xact_depth > 0)
{
bool abort_cleanup_failure = false;
elog(DEBUG3, "closing remote transaction on connection %p",
entry->conn);
@ -940,67 +940,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/*
* Don't try to clean up the connection if we're already
* in error recursion trouble.
*/
if (in_error_recursion_trouble())
entry->changing_xact_state = true;
/*
* If connection is already unsalvageable, don't touch it
* further.
*/
if (entry->changing_xact_state)
break;
/*
* Mark this connection as in the process of changing
* transaction state.
*/
entry->changing_xact_state = true;
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by
* using an asynchronous execution function, the command
* might not have yet completed. Check to see if a
* command is still being processed by the remote server,
* and if so, request cancellation of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
!pgfdw_cancel_query(entry->conn))
{
/* Unable to cancel running query. */
abort_cleanup_failure = true;
}
else if (!pgfdw_exec_cleanup_query(entry->conn,
"ABORT TRANSACTION",
false))
{
/* Unable to abort remote transaction. */
abort_cleanup_failure = true;
}
else if (entry->have_prep_stmt && entry->have_error &&
!pgfdw_exec_cleanup_query(entry->conn,
"DEALLOCATE ALL",
true))
{
/* Trouble clearing prepared statements. */
abort_cleanup_failure = true;
}
else
{
entry->have_prep_stmt = false;
entry->have_error = false;
/* Also reset per-connection state */
memset(&entry->state, 0, sizeof(entry->state));
}
/* Disarm changing_xact_state if it all worked. */
entry->changing_xact_state = abort_cleanup_failure;
pgfdw_abort_cleanup(entry, "ABORT TRANSACTION", true);
break;
}
}
@ -1091,46 +1031,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
do_sql_command(entry->conn, sql);
entry->changing_xact_state = false;
}
else if (in_error_recursion_trouble())
else
{
/*
* Don't try to clean up the connection if we're already in error
* recursion trouble.
*/
entry->changing_xact_state = true;
}
else if (!entry->changing_xact_state)
{
bool abort_cleanup_failure = false;
/* Remember that abort cleanup is in progress. */
entry->changing_xact_state = true;
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by using
* an asynchronous execution function, the command might not have
* yet completed. Check to see if a command is still being
* processed by the remote server, and if so, request cancellation
* of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
!pgfdw_cancel_query(entry->conn))
abort_cleanup_failure = true;
else
{
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
curlevel, curlevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
abort_cleanup_failure = true;
}
/* Disarm changing_xact_state if it all worked. */
entry->changing_xact_state = abort_cleanup_failure;
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
curlevel, curlevel);
pgfdw_abort_cleanup(entry, sql, false);
}
/* OK, we're outta that level of subtransaction */
@ -1409,6 +1316,72 @@ exit: ;
return timed_out;
}
/*
* Abort remote transaction.
*
* The statement specified in "sql" is sent to the remote server,
* in order to rollback the remote transaction.
*
* "toplevel" should be set to true if toplevel (main) transaction is
* rollbacked, false otherwise.
*
* Set entry->changing_xact_state to false on success, true on failure.
*/
static void
pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
{
/*
* Don't try to clean up the connection if we're already in error
* recursion trouble.
*/
if (in_error_recursion_trouble())
entry->changing_xact_state = true;
/*
* If connection is already unsalvageable, don't touch it further.
*/
if (entry->changing_xact_state)
return;
/*
* Mark this connection as in the process of changing transaction state.
*/
entry->changing_xact_state = true;
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by using an
* asynchronous execution function, the command might not have yet
* completed. Check to see if a command is still being processed by the
* remote server, and if so, request cancellation of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
!pgfdw_cancel_query(entry->conn))
return; /* Unable to cancel running query */
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
return; /* Unable to abort remote transaction */
if (toplevel)
{
if (entry->have_prep_stmt && entry->have_error &&
!pgfdw_exec_cleanup_query(entry->conn,
"DEALLOCATE ALL",
true))
return; /* Trouble clearing prepared statements */
entry->have_prep_stmt = false;
entry->have_error = false;
/* Also reset per-connection state */
memset(&entry->state, 0, sizeof(entry->state));
}
/* Disarm changing_xact_state if it all worked */
entry->changing_xact_state = false;
}
/*
* List active foreign server connections.
*