From 1e8a850094478a2036891fa3d4ce769bce411ee3 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 3 Mar 2017 09:07:22 -0500 Subject: [PATCH] Use asynchronous connect API in libpqwalreceiver This makes the connection attempt from CREATE SUBSCRIPTION and from WalReceiver interruptable by the user in case the libpq connection is hanging. The previous coding required immediate shutdown (SIGQUIT) of PostgreSQL in that situation. From: Petr Jelinek Tested-by: Thom Brown --- src/backend/postmaster/pgstat.c | 4 +- .../libpqwalreceiver/libpqwalreceiver.c | 49 ++++++++++++++++++- src/include/pgstat.h | 2 +- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index ada374c0c4..2fb9a8bf58 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; - case WAIT_EVENT_LIBPQWALRECEIVER_READ: - event_name = "LibPQWalReceiverRead"; + case WAIT_EVENT_LIBPQWALRECEIVER: + event_name = "LibPQWalReceiver"; break; case WAIT_EVENT_WAL_SENDER_WAIT_WAL: event_name = "WalSenderWaitForWAL"; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index daae3f70e7..048d2aaa76 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; + PostgresPollingStatusType status; const char *keys[5]; const char *vals[5]; int i = 0; @@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, Assert(i < sizeof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); + conn->streamConn = PQconnectStartParams(keys, vals, + /* expand_dbname = */ true); + if (PQstatus(conn->streamConn) == CONNECTION_BAD) + { + *err = pchomp(PQerrorMessage(conn->streamConn)); + return NULL; + } + + /* Poll connection. */ + do + { + /* Determine current state of the connection. */ + status = PQconnectPoll(conn->streamConn); + + /* Sleep a bit if waiting for socket. */ + if (status == PGRES_POLLING_READING || + status == PGRES_POLLING_WRITING) + { + int extra_flag; + int rc; + + extra_flag = (status == PGRES_POLLING_READING + ? WL_SOCKET_READABLE + : WL_SOCKET_WRITEABLE); + + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | + WL_LATCH_SET | extra_flag, + PQsocket(conn->streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout. */ + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* Interrupted. */ + if (rc & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + } + + /* Otherwise loop until we have OK or FAILED status. */ + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + if (PQstatus(conn->streamConn) != CONNECTION_OK) { *err = pchomp(PQerrorMessage(conn->streamConn)); @@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) WL_LATCH_SET, PQsocket(streamConn), 0, - WAIT_EVENT_LIBPQWALRECEIVER_READ); + WAIT_EVENT_LIBPQWALRECEIVER); if (rc & WL_POSTMASTER_DEATH) exit(1); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 8b710ecb24..0062fb8af2 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -764,7 +764,7 @@ typedef enum WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_WAL_RECEIVER_WAIT_START, - WAIT_EVENT_LIBPQWALRECEIVER_READ, + WAIT_EVENT_LIBPQWALRECEIVER, WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WRITE_DATA } WaitEventClient;