diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index ada374c0c4..2fb9a8bf58 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; - case WAIT_EVENT_LIBPQWALRECEIVER_READ: - event_name = "LibPQWalReceiverRead"; + case WAIT_EVENT_LIBPQWALRECEIVER: + event_name = "LibPQWalReceiver"; break; case WAIT_EVENT_WAL_SENDER_WAIT_WAL: event_name = "WalSenderWaitForWAL"; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index daae3f70e7..048d2aaa76 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; + PostgresPollingStatusType status; const char *keys[5]; const char *vals[5]; int i = 0; @@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, Assert(i < sizeof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); + conn->streamConn = PQconnectStartParams(keys, vals, + /* expand_dbname = */ true); + if (PQstatus(conn->streamConn) == CONNECTION_BAD) + { + *err = pchomp(PQerrorMessage(conn->streamConn)); + return NULL; + } + + /* Poll connection. */ + do + { + /* Determine current state of the connection. */ + status = PQconnectPoll(conn->streamConn); + + /* Sleep a bit if waiting for socket. */ + if (status == PGRES_POLLING_READING || + status == PGRES_POLLING_WRITING) + { + int extra_flag; + int rc; + + extra_flag = (status == PGRES_POLLING_READING + ? WL_SOCKET_READABLE + : WL_SOCKET_WRITEABLE); + + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | + WL_LATCH_SET | extra_flag, + PQsocket(conn->streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout. */ + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* Interrupted. */ + if (rc & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); + } + + /* Otherwise loop until we have OK or FAILED status. */ + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + if (PQstatus(conn->streamConn) != CONNECTION_OK) { *err = pchomp(PQerrorMessage(conn->streamConn)); @@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) WL_LATCH_SET, PQsocket(streamConn), 0, - WAIT_EVENT_LIBPQWALRECEIVER_READ); + WAIT_EVENT_LIBPQWALRECEIVER); if (rc & WL_POSTMASTER_DEATH) exit(1); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 8b710ecb24..0062fb8af2 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -764,7 +764,7 @@ typedef enum WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_WAL_RECEIVER_WAIT_START, - WAIT_EVENT_LIBPQWALRECEIVER_READ, + WAIT_EVENT_LIBPQWALRECEIVER, WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WRITE_DATA } WaitEventClient;