diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index c6e3d44515..1b691fb05e 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -14,6 +14,8 @@ #include "postgres_fdw.h" +#include "access/htup_details.h" +#include "catalog/pg_user_mapping.h" #include "access/xact.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -21,6 +23,7 @@ #include "storage/latch.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#include "utils/syscache.h" /* @@ -49,6 +52,7 @@ typedef struct ConnCacheEntry * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ + bool changing_xact_state; /* xact state change in process */ } ConnCacheEntry; /* @@ -74,6 +78,12 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); +static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); +static bool pgfdw_cancel_query(PGconn *conn); +static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, + bool ignore_errors); +static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, + PGresult **result); /* @@ -139,8 +149,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->changing_xact_state = false; } + /* Reject further use of connections which failed abort cleanup. */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -343,7 +357,9 @@ do_sql_command(PGconn *conn, const char *sql) { PGresult *res; - res = PQexec(conn, sql); + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + res = pgfdw_get_result(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -376,8 +392,10 @@ begin_remote_xact(ConnCacheEntry *entry) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + entry->changing_xact_state = false; } /* @@ -390,8 +408,10 @@ begin_remote_xact(ConnCacheEntry *entry) char sql[64]; snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1); + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth++; + entry->changing_xact_state = false; } } @@ -604,6 +624,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { + bool abort_cleanup_failure = false; + elog(DEBUG3, "closing remote transaction on connection %p", entry->conn); @@ -611,8 +633,17 @@ pgfdw_xact_callback(XactEvent event, void *arg) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: + + /* + * If abort cleanup previously failed for this connection, + * we can't issue any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* Commit all remote transactions during pre-commit */ + entry->changing_xact_state = true; do_sql_command(entry->conn, "COMMIT TRANSACTION"); + entry->changing_xact_state = false; /* * If there were any errors in subtransactions, and we @@ -660,6 +691,27 @@ pgfdw_xact_callback(XactEvent event, void *arg) break; case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: + + /* + * Don't try to clean up the connection if we're already + * in error recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is already unsalvageable, don't touch it + * further. + */ + if (entry->changing_xact_state) + break; + + /* + * Mark this connection as in the process of changing + * transaction state. + */ + entry->changing_xact_state = true; + /* Assume we might have lost track of prepared statements */ entry->have_error = true; @@ -670,40 +722,35 @@ pgfdw_xact_callback(XactEvent event, void *arg) * command is still being processed by the remote server, * and if so, request cancellation of the command. */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) { - PGcancel *cancel; - char errbuf[256]; - - if ((cancel = PQgetCancel(entry->conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - } + /* Unable to cancel running query. */ + abort_cleanup_failure = true; + } + else if (!pgfdw_exec_cleanup_query(entry->conn, + "ABORT TRANSACTION", + false)) + { + /* Unable to abort remote transaction. */ + abort_cleanup_failure = true; + } + else if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + { + /* Trouble clearing prepared statements. */ + abort_cleanup_failure = true; } - - /* If we're aborting, abort all remote transactions too */ - res = PQexec(entry->conn, "ABORT TRANSACTION"); - /* Note: can't throw ERROR, it would be infinite loop */ - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(WARNING, res, entry->conn, true, - "ABORT TRANSACTION"); else { - PQclear(res); - /* As above, make sure to clear any prepared stmts */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } entry->have_prep_stmt = false; entry->have_error = false; } + + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; break; } } @@ -716,11 +763,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) * recover. Next GetConnection will open a new connection. */ if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); PQfinish(entry->conn); entry->conn = NULL; + entry->changing_xact_state = false; } } @@ -763,7 +812,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - PGresult *res; char sql[100]; /* @@ -779,12 +827,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) { + /* + * If abort cleanup previously failed for this connection, we + * can't issue any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + entry->changing_xact_state = true; do_sql_command(entry->conn, sql); + entry->changing_xact_state = false; } - else + else if (in_error_recursion_trouble()) { + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + entry->changing_xact_state = true; + } + else if (!entry->changing_xact_state) + { + bool abort_cleanup_failure = false; + + /* Remember that abort cleanup is in progress. */ + entry->changing_xact_state = true; + /* Assume we might have lost track of prepared statements */ entry->have_error = true; @@ -795,34 +864,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, * processed by the remote server, and if so, request cancellation * of the command. */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + abort_cleanup_failure = true; + else { - PGcancel *cancel; - char errbuf[256]; - - if ((cancel = PQgetCancel(entry->conn))) - { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - } + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + curlevel, curlevel); + if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) + abort_cleanup_failure = true; } - /* Rollback all remote subtransactions during abort */ - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - curlevel, curlevel); - res = PQexec(entry->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(WARNING, res, entry->conn, true, sql); - else - PQclear(res); + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; } /* OK, we're outta that level of subtransaction */ entry->xact_depth--; } } + +/* + * Raise an error if the given connection cache entry is marked as being + * in the middle of an xact state change. This should be called at which no + * such change is expected to be in progress; if one is found to be in + * progress, it means that we aborted in the middle of a previous state change + * and now don't know what the remote transaction state actually is. + * Such connections can't safely be further used. Re-establishing the + * connection would change the snapshot and roll back any writes already + * performed, so that's not an option, either. Thus, we must abort. + */ +static void +pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) +{ + HeapTuple tup; + Form_pg_user_mapping umform; + ForeignServer *server; + + if (!entry->changing_xact_state) + return; + + tup = SearchSysCache1(USERMAPPINGOID, + ObjectIdGetDatum(entry->key)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for user mapping %u", entry->key); + umform = (Form_pg_user_mapping) GETSTRUCT(tup); + server = GetForeignServer(umform->umserver); + ReleaseSysCache(tup); + + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("connection to server \"%s\" was lost", + server->servername))); +} + +/* + * Cancel the currently-in-progress query (whose query text we do not have) + * and ignore the result. Returns true if we successfully cancel the query + * and discard any pending result, and false if not. + */ +static bool +pgfdw_cancel_query(PGconn *conn) +{ + PGcancel *cancel; + char errbuf[256]; + PGresult *result = NULL; + TimestampTz endtime; + + /* + * If it takes too long to cancel the query and discard the result, assume + * the connection is dead. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + + /* + * Issue cancel request. Unfortunately, there's no good way to limit the + * amount of time that we might block inside PQgetCancel(). + */ + if ((cancel = PQgetCancel(conn))) + { + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + errbuf))); + PQfreeCancel(cancel); + return false; + } + PQfreeCancel(cancel); + } + + /* Get and discard the result of the query. */ + if (pgfdw_get_cleanup_result(conn, endtime, &result)) + return false; + PQclear(result); + + return true; +} + +/* + * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the + * result. If the query is executed without error, the return value is true. + * If the query is executed successfully but returns an error, the return + * value is true if and only if ignore_errors is set. If the query can't be + * sent or times out, the return value is false. + */ +static bool +pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) +{ + PGresult *result = NULL; + TimestampTz endtime; + + /* + * If it takes too long to execute a cleanup query, assume the connection + * is dead. It's fairly likely that this is why we aborted in the first + * place (e.g. statement timeout, user cancel), so the timeout shouldn't + * be too long. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + + /* + * Submit a query. Since we don't use non-blocking mode, this also can + * block. But its risk is relatively small, so we ignore that for now. + */ + if (!PQsendQuery(conn, query)) + { + pgfdw_report_error(WARNING, NULL, conn, false, query); + return false; + } + + /* Get the result of the query. */ + if (pgfdw_get_cleanup_result(conn, endtime, &result)) + return false; + + /* Issue a warning if not successful. */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + pgfdw_report_error(WARNING, result, conn, true, query); + return ignore_errors; + } + + return true; +} + +/* + * Get, during abort cleanup, the result of a query that is in progress. This + * might be a query that is being interrupted by transaction abort, or it might + * be a query that was initiated as part of transaction abort to get the remote + * side back to the appropriate state. + * + * It's not a huge problem if we throw an ERROR here, but if we get into error + * recursion trouble, we'll end up slamming the connection shut, which will + * necessitate failing the entire toplevel transaction even if subtransactions + * were used. Try to use WARNING where we can. + * + * endtime is the time at which we should give up and assume the remote + * side is dead. Returns true if the timeout expired, otherwise false. + * Sets *result except in case of a timeout. + */ +static bool +pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) +{ + PGresult *last_res = NULL; + + for (;;) + { + PGresult *res; + + while (PQisBusy(conn)) + { + int wc; + TimestampTz now = GetCurrentTimestamp(); + long secs; + int microsecs; + long cur_timeout; + + /* If timeout has expired, give up, else get sleep time. */ + if (now >= endtime) + return true; + TimestampDifference(now, endtime, &secs, µsecs); + + /* To protect against clock skew, limit sleep to one minute. */ + cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, + PQsocket(conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + { + *result = NULL; + return false; + } + } + } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; + } + + *result = last_res; + return false; +}