diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index a3921977c5..c7584cb1d3 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_LIBPQWALRECEIVER_READ: + event_name = "LibPQWalReceiverRead"; + break; case WAIT_EVENT_WAL_SENDER_WAIT_WAL: event_name = "WalSenderWaitForWAL"; break; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f1c843e868..6c01e7b991 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -23,19 +23,11 @@ #include "pqexpbuffer.h" #include "access/xlog.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/walreceiver.h" +#include "storage/proc.h" #include "utils/builtins.h" -#ifdef HAVE_POLL_H -#include -#endif -#ifdef HAVE_SYS_POLL_H -#include -#endif -#ifdef HAVE_SYS_SELECT_H -#include -#endif - PG_MODULE_MAGIC; void _PG_init(void); @@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ -static bool libpq_select(int timeout_ms); static PGresult *libpqrcv_PQexec(const char *query); /* @@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli, PQclear(res); } -/* - * Wait until we can read WAL stream, or timeout. - * - * Returns true if data has become available for reading, false if timed out - * or interrupted by signal. - * - * This is based on pqSocketCheck. - */ -static bool -libpq_select(int timeout_ms) -{ - int ret; - - Assert(streamConn != NULL); - if (PQsocket(streamConn) < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("invalid socket: %s", PQerrorMessage(streamConn)))); - - /* We use poll(2) if available, otherwise select(2) */ - { -#ifdef HAVE_POLL - struct pollfd input_fd; - - input_fd.fd = PQsocket(streamConn); - input_fd.events = POLLIN | POLLERR; - input_fd.revents = 0; - - ret = poll(&input_fd, 1, timeout_ms); -#else /* !HAVE_POLL */ - - fd_set input_mask; - struct timeval timeout; - struct timeval *ptr_timeout; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(streamConn), &input_mask); - - if (timeout_ms < 0) - ptr_timeout = NULL; - else - { - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms % 1000) * 1000; - ptr_timeout = &timeout; - } - - ret = select(PQsocket(streamConn) + 1, &input_mask, - NULL, NULL, ptr_timeout); -#endif /* HAVE_POLL */ - } - - if (ret == 0 || (ret < 0 && errno == EINTR)) - return false; - if (ret < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed: %m"))); - return true; -} - /* * Send a query and wait for the results by using the asynchronous libpq * functions and the backend version of select(). @@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query) */ while (PQisBusy(streamConn)) { + int rc; + /* * We don't need to break down the sleep into smaller increments, - * and check for interrupts after each nap, since we can just - * elog(FATAL) within SIGTERM signal handler if the signal arrives - * in the middle of establishment of replication connection. + * since we'll get interrupted by signals and can either handle + * interrupts here or elog(FATAL) within SIGTERM signal handler if + * the signal arrives in the middle of establishment of + * replication connection. */ - if (!libpq_select(-1)) - continue; /* interrupted */ + ResetLatch(&MyProc->procLatch); + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | + WL_LATCH_SET, + PQsocket(streamConn), + 0, + WAIT_EVENT_LIBPQWALRECEIVER_READ); + if (rc & WL_POSTMASTER_DEATH) + exit(1); + + /* interrupted */ + if (rc & WL_LATCH_SET) + { + CHECK_FOR_INTERRUPTS(); + continue; + } if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2bb3dce1b1..8bfb041560 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -261,7 +261,7 @@ WalReceiverMain(void) /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); - OwnLatch(&walrcv->latch); + walrcv->latch = &MyProc->procLatch; /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config @@ -483,7 +483,7 @@ WalReceiverMain(void) * avoiding some system calls. */ Assert(wait_fd != PGINVALID_SOCKET); - rc = WaitLatchOrSocket(&walrcv->latch, + rc = WaitLatchOrSocket(walrcv->latch, WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, @@ -491,7 +491,7 @@ WalReceiverMain(void) WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { - ResetLatch(&walrcv->latch); + ResetLatch(walrcv->latch); if (walrcv->force_reply) { /* @@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) WakeupRecovery(); for (;;) { - ResetLatch(&walrcv->latch); + ResetLatch(walrcv->latch); /* * Emergency bailout if postmaster has died. This is to avoid the @@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) } SpinLockRelease(&walrcv->mutex); - WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, + WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START); } @@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg) /* Ensure that all WAL records received are flushed to disk */ XLogWalRcvFlush(true); - DisownLatch(&walrcv->latch); + walrcv->latch = NULL; SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || @@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS) got_SIGTERM = true; - SetLatch(&WalRcv->latch); + if (WalRcv->latch) + SetLatch(WalRcv->latch); /* Don't joggle the elbow of proc_exit */ if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) @@ -1297,7 +1298,8 @@ void WalRcvForceReply(void) { WalRcv->force_reply = true; - SetLatch(&WalRcv->latch); + if (WalRcv->latch) + SetLatch(WalRcv->latch); } /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 5f6e423f1f..01111a4c12 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -64,7 +64,7 @@ WalRcvShmemInit(void) MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); - InitSharedLatch(&WalRcv->latch); + WalRcv->latch = NULL; } } @@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); - else - SetLatch(&walrcv->latch); + else if (walrcv->latch) + SetLatch(walrcv->latch); } /* diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0b85b7ad3a..152ff06208 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -763,6 +763,7 @@ typedef enum WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_LIBPQWALRECEIVER_READ, WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WRITE_DATA } WaitEventClient; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index cd787c92b3..afbb8d8b95 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -127,8 +127,9 @@ typedef struct * where to start streaming (after setting receiveStart and * receiveStartTLI), and also to tell it to send apply feedback to the * primary whenever specially marked commit records are applied. + * This is normally mapped to procLatch when walreceiver is running. */ - Latch latch; + Latch *latch; } WalRcvData; extern WalRcvData *WalRcv;