From 85c69611288f4096b7460d980bedaa777f824d24 Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Wed, 22 Sep 2021 23:47:36 +0900 Subject: [PATCH] postgres_fdw: Refactor transaction rollback code to avoid code duplication. In postgres_fdw, pgfdw_xact_callback() and pgfdw_subxact_callback() callback functions do almost the same thing to rollback remote toplevel- and sub-transaction. But previously their such rollback logics were implemented separately in each function and in different way. Which could decrease the readability and maintainability of the code. To fix the issue, this commit creates the common function to rollback remote transactions, and makes those callback functions use it. Which allows us to avoid unnecessary code duplication. Author: Fujii Masao Reviewed-by: Zhihong Yu, Bharath Rupireddy Discussion: https://postgr.es/m/62fbb63a-d46c-fb47-a56d-f6be1909aa44@oss.nttdata.com --- contrib/postgres_fdw/connection.c | 177 +++++++++++++----------------- 1 file changed, 75 insertions(+), 102 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 705f60a3ae..1e67da983c 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -105,6 +105,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); +static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, + bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -872,8 +874,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { - bool abort_cleanup_failure = false; - elog(DEBUG3, "closing remote transaction on connection %p", entry->conn); @@ -940,67 +940,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - /* Also reset per-connection state */ - memset(&entry->state, 0, sizeof(entry->state)); - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; + pgfdw_abort_cleanup(entry, "ABORT TRANSACTION", true); break; } } @@ -1091,46 +1031,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, do_sql_command(entry->conn, sql); entry->changing_xact_state = false; } - else if (in_error_recursion_trouble()) + else { - /* - * Don't try to clean up the connection if we're already in error - * recursion trouble. - */ - entry->changing_xact_state = true; - } - else if (!entry->changing_xact_state) - { - bool abort_cleanup_failure = false; - - /* Remember that abort cleanup is in progress. */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by using - * an asynchronous execution function, the command might not have - * yet completed. Check to see if a command is still being - * processed by the remote server, and if so, request cancellation - * of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - abort_cleanup_failure = true; - else - { - /* Rollback all remote subtransactions during abort */ - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - curlevel, curlevel); - if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) - abort_cleanup_failure = true; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + curlevel, curlevel); + pgfdw_abort_cleanup(entry, sql, false); } /* OK, we're outta that level of subtransaction */ @@ -1409,6 +1316,72 @@ exit: ; return timed_out; } +/* + * Abort remote transaction. + * + * The statement specified in "sql" is sent to the remote server, + * in order to rollback the remote transaction. + * + * "toplevel" should be set to true if toplevel (main) transaction is + * rollbacked, false otherwise. + * + * Set entry->changing_xact_state to false on success, true on failure. + */ +static void +pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel) +{ + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is already unsalvageable, don't touch it further. + */ + if (entry->changing_xact_state) + return; + + /* + * Mark this connection as in the process of changing transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by using an + * asynchronous execution function, the command might not have yet + * completed. Check to see if a command is still being processed by the + * remote server, and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + return; /* Unable to cancel running query */ + + if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) + return; /* Unable to abort remote transaction */ + + if (toplevel) + { + if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + return; /* Trouble clearing prepared statements */ + + entry->have_prep_stmt = false; + entry->have_error = false; + /* Also reset per-connection state */ + memset(&entry->state, 0, sizeof(entry->state)); + } + + /* Disarm changing_xact_state if it all worked */ + entry->changing_xact_state = false; +} + /* * List active foreign server connections. *