From ae9bfc5d65123aaa0d1cca9988037489760bdeae Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 7 Jun 2017 15:14:55 -0400 Subject: [PATCH] postgres_fdw: Allow cancellation of transaction control commands. Commit f039eaac7131ef2a4cf63a10cf98486f8bcd09d2, later back-patched with commit 1b812afb0eafe125b820cc3b95e7ca03821aa675, allowed many of the queries issued by postgres_fdw to fetch remote data to respond to cancel interrupts in a timely fashion. However, it didn't do anything about the transaction control commands, which remained noninterruptible. Improve the situation by changing do_sql_command() to retrieve query results using pgfdw_get_result(), which uses the asynchronous interface to libpq so that it can check for interrupts every time libpq returns control. Since this might result in a situation where we can no longer be sure that the remote transaction state matches the local transaction state, add a facility to force all levels of the local transaction to abort if we've lost track of the remote state; without this, an apparently-successful commit of the local transaction might fail to commit changes made on the remote side. Also, add a 60-second timeout for queries issue during transaction abort; if that expires, give up and mark the state of the connection as unknown. Drop all such connections when we exit the local transaction. Together, these changes mean that if we're aborting the local toplevel transaction anyway, we can just drop the remote connection in lieu of waiting (possibly for a very long time) for it to complete an abort. This still leaves quite a bit of room for improvement. PQcancel() has no asynchronous interface, so if we get stuck sending the cancel request we'll still hang. Also, PQsetnonblocking() is not used, which means we could block uninterruptibly when sending a query. There might be some other optimizations possible as well. Nonetheless, this allows us to escape a wait for an unresponsive remote server quickly in many more cases than previously. Report by Suraj Kharage. Patch by me and Rafia Sabih. Review and testing by Amit Kapila and Tushar Ahuja. Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com --- contrib/postgres_fdw/connection.c | 361 +++++++++++++++++++++++++----- 1 file changed, 308 insertions(+), 53 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index c6e3d44515..1b691fb05e 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -14,6 +14,8 @@ #include "postgres_fdw.h" +#include "access/htup_details.h" +#include "catalog/pg_user_mapping.h" #include "access/xact.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -21,6 +23,7 @@ #include "storage/latch.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#include "utils/syscache.h" /* @@ -49,6 +52,7 @@ typedef struct ConnCacheEntry * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ + bool changing_xact_state; /* xact state change in process */ } ConnCacheEntry; /* @@ -74,6 +78,12 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); +static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); +static bool pgfdw_cancel_query(PGconn *conn); +static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, + bool ignore_errors); +static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, + PGresult **result); /* @@ -139,8 +149,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->changing_xact_state = false; } + /* Reject further use of connections which failed abort cleanup. */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -343,7 +357,9 @@ do_sql_command(PGconn *conn, const char *sql) { PGresult *res; - res = PQexec(conn, sql); + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + res = pgfdw_get_result(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -376,8 +392,10 @@ begin_remote_xact(ConnCacheEntry *entry) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + entry->changing_xact_state = false; } /* @@ -390,8 +408,10 @@ begin_remote_xact(ConnCacheEntry *entry) char sql[64]; snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth++; + entry->changing_xact_state = false; } } @@ -604,6 +624,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { + bool abort_cleanup_failure = false; + elog(DEBUG3, "closing remote transaction on connection %p", entry->conn); @@ -611,8 +633,17 @@ pgfdw_xact_callback(XactEvent event, void *arg) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: + + /* + * If abort cleanup previously failed for this connection, + * we can't issue any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* Commit all remote transactions during pre-commit */ + entry->changing_xact_state = true; do_sql_command(entry->conn, "COMMIT TRANSACTION"); + entry->changing_xact_state = false; /* * If there were any errors in subtransactions, and we @@ -660,6 +691,27 @@ pgfdw_xact_callback(XactEvent event, void *arg) break; case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: + + /* + * Don't try to clean up the connection if we're already + * in error recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is already unsalvageable, don't touch it + * further. + */ + if (entry->changing_xact_state) + break; + + /* + * Mark this connection as in the process of changing + * transaction state. + */ + entry->changing_xact_state = true; + /* Assume we might have lost track of prepared statements */ entry->have_error = true; @@ -670,40 +722,35 @@ pgfdw_xact_callback(XactEvent event, void *arg) * command is still being processed by the remote server, * and if so, request cancellation of the command. */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) { - PGcancel *cancel; - char errbuf[256]; - - if ((cancel = PQgetCancel(entry->conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - } + /* Unable to cancel running query. */ + abort_cleanup_failure = true; + } + else if (!pgfdw_exec_cleanup_query(entry->conn, + "ABORT TRANSACTION", + false)) + { + /* Unable to abort remote transaction. */ + abort_cleanup_failure = true; + } + else if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + { + /* Trouble clearing prepared statements. */ + abort_cleanup_failure = true; } - - /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); - /* Note: can't throw ERROR, it would be infinite loop */ - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(WARNING, res, entry->conn, true, - "ABORT TRANSACTION"); else { - PQclear(res); - /* As above, make sure to clear any prepared stmts */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } entry->have_prep_stmt = false; entry->have_error = false; } + + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; break; } } @@ -716,11 +763,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) * recover. Next GetConnection will open a new connection. */ if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); PQfinish(entry->conn); entry->conn = NULL; + entry->changing_xact_state = false; } } @@ -763,7 +812,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - PGresult *res; char sql[100]; /* @@ -779,12 +827,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) { + /* + * If abort cleanup previously failed for this connection, we + * can't issue any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); + entry->changing_xact_state = false; } - else + else if (in_error_recursion_trouble()) { + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + entry->changing_xact_state = true; + } + else if (!entry->changing_xact_state) + { + bool abort_cleanup_failure = false; + + /* Remember that abort cleanup is in progress. */ + entry->changing_xact_state = true; + /* Assume we might have lost track of prepared statements */ entry->have_error = true; @@ -795,34 +864,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, * processed by the remote server, and if so, request cancellation * of the command. */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + abort_cleanup_failure = true; + else { - PGcancel *cancel; - char errbuf[256]; - - if ((cancel = PQgetCancel(entry->conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - } + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + curlevel, curlevel); + if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) + abort_cleanup_failure = true; } - /* Rollback all remote subtransactions during abort */ - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - curlevel, curlevel); - res = PQexec(entry->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(WARNING, res, entry->conn, true, sql); - else - PQclear(res); + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; } /* OK, we're outta that level of subtransaction */ entry->xact_depth--; } } + +/* + * Raise an error if the given connection cache entry is marked as being + * in the middle of an xact state change. This should be called at which no + * such change is expected to be in progress; if one is found to be in + * progress, it means that we aborted in the middle of a previous state change + * and now don't know what the remote transaction state actually is. + * Such connections can't safely be further used. Re-establishing the + * connection would change the snapshot and roll back any writes already + * performed, so that's not an option, either. Thus, we must abort. + */ +static void +pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) +{ + HeapTuple tup; + Form_pg_user_mapping umform; + ForeignServer *server; + + if (!entry->changing_xact_state) + return; + + tup = SearchSysCache1(USERMAPPINGOID, + ObjectIdGetDatum(entry->key)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for user mapping %u", entry->key); + umform = (Form_pg_user_mapping) GETSTRUCT(tup); + server = GetForeignServer(umform->umserver); + ReleaseSysCache(tup); + + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("connection to server \"%s\" was lost", + server->servername))); +} + +/* + * Cancel the currently-in-progress query (whose query text we do not have) + * and ignore the result. Returns true if we successfully cancel the query + * and discard any pending result, and false if not. + */ +static bool +pgfdw_cancel_query(PGconn *conn) +{ + PGcancel *cancel; + char errbuf[256]; + PGresult *result = NULL; + TimestampTz endtime; + + /* + * If it takes too long to cancel the query and discard the result, assume + * the connection is dead. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + + /* + * Issue cancel request. Unfortunately, there's no good way to limit the + * amount of time that we might block inside PQgetCancel(). + */ + if ((cancel = PQgetCancel(conn))) + { + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + errbuf))); + PQfreeCancel(cancel); + return false; + } + PQfreeCancel(cancel); + } + + /* Get and discard the result of the query. */ + if (pgfdw_get_cleanup_result(conn, endtime, &result)) + return false; + PQclear(result); + + return true; +} + +/* + * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the + * result. If the query is executed without error, the return value is true. + * If the query is executed successfully but returns an error, the return + * value is true if and only if ignore_errors is set. If the query can't be + * sent or times out, the return value is false. + */ +static bool +pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) +{ + PGresult *result = NULL; + TimestampTz endtime; + + /* + * If it takes too long to execute a cleanup query, assume the connection + * is dead. It's fairly likely that this is why we aborted in the first + * place (e.g. statement timeout, user cancel), so the timeout shouldn't + * be too long. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + + /* + * Submit a query. Since we don't use non-blocking mode, this also can + * block. But its risk is relatively small, so we ignore that for now. + */ + if (!PQsendQuery(conn, query)) + { + pgfdw_report_error(WARNING, NULL, conn, false, query); + return false; + } + + /* Get the result of the query. */ + if (pgfdw_get_cleanup_result(conn, endtime, &result)) + return false; + + /* Issue a warning if not successful. */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + pgfdw_report_error(WARNING, result, conn, true, query); + return ignore_errors; + } + + return true; +} + +/* + * Get, during abort cleanup, the result of a query that is in progress. This + * might be a query that is being interrupted by transaction abort, or it might + * be a query that was initiated as part of transaction abort to get the remote + * side back to the appropriate state. + * + * It's not a huge problem if we throw an ERROR here, but if we get into error + * recursion trouble, we'll end up slamming the connection shut, which will + * necessitate failing the entire toplevel transaction even if subtransactions + * were used. Try to use WARNING where we can. + * + * endtime is the time at which we should give up and assume the remote + * side is dead. Returns true if the timeout expired, otherwise false. + * Sets *result except in case of a timeout. + */ +static bool +pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) +{ + PGresult *last_res = NULL; + + for (;;) + { + PGresult *res; + + while (PQisBusy(conn)) + { + int wc; + TimestampTz now = GetCurrentTimestamp(); + long secs; + int microsecs; + long cur_timeout; + + /* If timeout has expired, give up, else get sleep time. */ + if (now >= endtime) + return true; + TimestampDifference(now, endtime, &secs, µsecs); + + /* To protect against clock skew, limit sleep to one minute. */ + cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, + PQsocket(conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + { + *result = NULL; + return false; + } + } + } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; + } + + *result = last_res; + return false; +}