diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 6cbb67c92a..8bd2ba37dd 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -95,8 +95,6 @@ bool hot_standby_feedback; static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; -#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ - /* * These variables are used similarly to openLogFile/SegNo, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID @@ -116,6 +114,23 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum WalRcvWakeupReason +{ + WALRCV_WAKEUP_TERMINATE, + WALRCV_WAKEUP_PING, + WALRCV_WAKEUP_REPLY, + WALRCV_WAKEUP_HSFEEDBACK, + NUM_WALRCV_WAKEUPS +} WalRcvWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); /* * Process any interrupts the walreceiver process may have received. @@ -179,9 +195,7 @@ WalReceiverMain(void) TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; - TimestampTz last_recv_timestamp; - TimestampTz starttime; - bool ping_sent; + TimestampTz now; char *err; char *sender_host = NULL; int sender_port = 0; @@ -192,7 +206,7 @@ WalReceiverMain(void) */ Assert(walrcv != NULL); - starttime = GetCurrentTimestamp(); + now = GetCurrentTimestamp(); /* * Mark walreceiver as running in shared memory. @@ -248,7 +262,7 @@ WalReceiverMain(void) /* Initialise to a sanish value */ walrcv->lastMsgSendTime = - walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime; + walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; /* Report the latch to use to awaken this process */ walrcv->latch = &MyProc->procLatch; @@ -414,9 +428,10 @@ WalReceiverMain(void) initStringInfo(&reply_message); initStringInfo(&incoming_message); - /* Initialize the last recv timestamp */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(i, now); /* Loop until end-of-streaming or error */ for (;;) @@ -426,6 +441,8 @@ WalReceiverMain(void) bool endofwal = false; pgsocket wait_fd = PGINVALID_SOCKET; int rc; + TimestampTz nextWakeup; + int nap; /* * Exit walreceiver if we're not in recovery. This should not @@ -443,11 +460,15 @@ WalReceiverMain(void) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(i, now); XLogWalRcvSendHSFeedback(true); } /* See if we can read data immediately */ len = walrcv_receive(wrconn, &buf, &wait_fd); + now = GetCurrentTimestamp(); if (len != 0) { /* @@ -459,11 +480,12 @@ WalReceiverMain(void) if (len > 0) { /* - * Something was received from primary, so reset - * timeout + * Something was received from primary, so adjust + * the ping and terminate wakeup times. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE, + now); + WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now); XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, startpointTLI); } @@ -480,6 +502,7 @@ WalReceiverMain(void) break; } len = walrcv_receive(wrconn, &buf, &wait_fd); + now = GetCurrentTimestamp(); } /* Let the primary know that we received some data. */ @@ -497,6 +520,20 @@ WalReceiverMain(void) if (endofwal) break; + /* Find the soonest wakeup time, to limit our nap. */ + nextWakeup = PG_INT64_MAX; + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + nextWakeup = Min(wakeup[i], nextWakeup); + + /* + * Calculate the nap time. WaitLatchOrSocket() doesn't accept + * timeouts longer than INT_MAX milliseconds, so we limit the + * result accordingly. Also, we round up to the next + * millisecond to avoid waking up too early and spinning until + * one of the wakeup times. + */ + nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); + /* * Ideally we would reuse a WaitEventSet object repeatedly * here to avoid the overheads of WaitLatchOrSocket on epoll @@ -513,8 +550,9 @@ WalReceiverMain(void) WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, - NAPTIME_PER_CYCLE, + nap, WAIT_EVENT_WAL_RECEIVER_MAIN); + now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -550,34 +588,19 @@ WalReceiverMain(void) * Check if time since last receive from primary has * reached the configured limit. */ - if (wal_receiver_timeout > 0) + if (now >= wakeup[WALRCV_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (now >= wakeup[WALRCV_WAKEUP_PING]) { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); - - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating walreceiver due to timeout"))); - - /* - * We didn't receive anything new, for half of - * receiver replication timeout. Ping the server. - */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + requestReply = true; + wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX; } XLogWalRcvSendReply(requestReply, requestReply); @@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply) static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; - static TimestampTz sendTime = 0; TimestampTz now; /* @@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply) if (!force && writePtr == LogstreamResult.Write && flushPtr == LogstreamResult.Flush - && !TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) + && now < wakeup[WALRCV_WAKEUP_REPLY]) return; - sendTime = now; + + /* Make sure we wake up when it's time to send another reply. */ + WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now); /* Construct a new message */ writePtr = LogstreamResult.Write; @@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed) catalog_xmin_epoch; TransactionId xmin, catalog_xmin; - static TimestampTz sendTime = 0; /* initially true so we always send at least one feedback message */ static bool primary_has_standby_xmin = true; @@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed) /* Get current timestamp. */ now = GetCurrentTimestamp(); - if (!immed) - { - /* - * Send feedback at most once per wal_receiver_status_interval. - */ - if (!TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) - return; - sendTime = now; - } + /* Send feedback at most once per wal_receiver_status_interval. */ + if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK]) + return; + + /* Make sure we wake up when it's time to send feedback again. */ + WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now); /* * If Hot Standby is not yet accepting connections there is nothing to @@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } } +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. + */ +static void +WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case WALRCV_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + break; + case WALRCV_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + break; + case WALRCV_WAKEUP_HSFEEDBACK: + if (!hot_standby_feedback || wal_receiver_status_interval <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + break; + case WALRCV_WAKEUP_REPLY: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + break; + default: + break; + } +} + /* * Wake up the walreceiver main loop. * diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9683b0a88e..245aea1dd1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2927,6 +2927,7 @@ WALInsertLock WALInsertLockPadded WALOpenSegment WALReadError +WalRcvWakeupReason WALSegmentCloseCB WALSegmentContext WALSegmentOpenCB