diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4023471434..2243ca2a36 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -21,6 +21,7 @@ #include "miscadmin.h" #include "storage/latch.h" #include "utils/hsearch.h" +#include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -47,11 +48,15 @@ typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ PGconn *conn; /* connection to foreign server, or NULL */ + /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool invalidated; /* true if reconnect is pending */ + uint32 server_hashvalue; /* hash value of foreign server OID */ + uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; /* @@ -68,6 +73,7 @@ static bool xact_got_connection = false; /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); @@ -77,6 +83,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); +static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -94,13 +101,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, * will_prep_stmt must be true if caller intends to create any prepared * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. - * - * XXX Note that caching connections theoretically requires a mechanism to - * detect change of FDW objects to invalidate already established connections. - * We could manage that by watching for invalidation events on the relevant - * syscaches. For the moment, though, it's not clear that this would really - * be useful and not mere pedantry. We could not flush any active connections - * mid-transaction anyway. */ PGconn * GetConnection(UserMapping *user, bool will_prep_stmt) @@ -129,6 +129,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + pgfdw_inval_callback, (Datum) 0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, + pgfdw_inval_callback, (Datum) 0); } /* Set flag that we did GetConnection during the current transaction */ @@ -143,17 +147,27 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { - /* initialize new hashtable entry (key is already filled in) */ + /* + * We need only clear "conn" here; remaining fields will be filled + * later when "conn" is set. + */ entry->conn = NULL; - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); + /* + * If the connection needs to be remade due to invalidation, disconnect as + * soon as we're out of all transactions. + */ + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) + { + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); + disconnect_pg_server(entry); + } + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -163,15 +177,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * If cache entry doesn't have a connection, we have to establish a new * connection. (If connect_pg_server throws an error, the cache entry - * will be left in a valid empty state.) + * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) { ForeignServer *server = GetForeignServer(user->serverid); - entry->xact_depth = 0; /* just to be sure */ + /* Reset all transient state fields, to be sure all are clean */ + entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, + ObjectIdGetDatum(user->umid)); + + /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", @@ -285,6 +310,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user) return conn; } +/* + * Disconnect any open connection for a connection cache entry. + */ +static void +disconnect_pg_server(ConnCacheEntry *entry) +{ + if (entry->conn != NULL) + { + PQfinish(entry->conn); + entry->conn = NULL; + } +} + /* * For non-superusers, insist that the connstr specify a password. This * prevents a password from being picked up from .pgpass, a service file, @@ -786,9 +824,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; - entry->changing_xact_state = false; + disconnect_pg_server(entry); } } @@ -905,6 +941,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } } +/* + * Connection invalidation callback function + * + * After a change to a pg_foreign_server or pg_user_mapping catalog entry, + * mark connections depending on that entry as needing to be remade. + * We can't immediately destroy them, since they might be in the midst of + * a transaction, but we'll remake them at the next opportunity. + * + * Although most cache invalidation callbacks blow away all the related stuff + * regardless of the given hashvalue, connections are expensive enough that + * it's worth trying to avoid that. + * + * NB: We could avoid unnecessary disconnection more strictly by examining + * individual option values, but it seems too much effort for the gain. + */ +static void +pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID); + + /* ConnectionHash must exist already, if we're registered */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore invalid entries */ + if (entry->conn == NULL) + continue; + + /* hashvalue == 0 means a cache reset, must clear all state */ + if (hashvalue == 0 || + (cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue)) + entry->invalidated = true; + } +} + /* * Raise an error if the given connection cache entry is marked as being * in the middle of an xact state change. This should be called at which no @@ -922,9 +999,14 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) Form_pg_user_mapping umform; ForeignServer *server; - if (!entry->changing_xact_state) + /* nothing to do for inactive entries and entries of sane state */ + if (entry->conn == NULL || !entry->changing_xact_state) return; + /* make sure this entry is inactive */ + disconnect_pg_server(entry); + + /* find server name to be shown in the message below */ tup = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup)) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index eb6124cccd..00b2e53fa9 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -183,6 +183,43 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | (5 rows) +-- Test that alteration of server options causes reconnection +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + +ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ERROR: could not connect to server "loopback" +DETAIL: FATAL: database "no such database" does not exist +DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; +$d$; +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + +-- Test that alteration of user mapping options causes reconnection +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ERROR: could not connect to server "loopback" +DETAIL: FATAL: role "no such user" does not exist +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2. diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 176b3a32a8..920f6a3155 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -187,6 +187,26 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ +-- Test that alteration of server options causes reconnection +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work +ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; +$d$; +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + +-- Test that alteration of user mapping options causes reconnection +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2.