diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ee09468db1..c7f5bd5ea3 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -95,6 +95,7 @@ static struct } LogstreamResult; static StandbyReplyMessage reply_message; +static StandbyHSFeedbackMessage feedback_message; /* * About SIGTERM handling: @@ -123,6 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(void); +static void XLogWalRcvSendHSFeedback(void); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -317,6 +319,7 @@ WalReceiverMain(void) /* Let the master know that we received some data. */ XLogWalRcvSendReply(); + XLogWalRcvSendHSFeedback(); /* * If we've written some records, flush them to disk and let the @@ -331,6 +334,7 @@ WalReceiverMain(void) * the master anyway, to report any progress in applying WAL. */ XLogWalRcvSendReply(); + XLogWalRcvSendHSFeedback(); } } } @@ -619,40 +623,82 @@ XLogWalRcvSendReply(void) reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; - /* - * Get the OldestXmin and its associated epoch - */ - if (hot_standby_feedback && HotStandbyActive()) - { - TransactionId nextXid; - uint32 nextEpoch; - - reply_message.xmin = GetOldestXmin(true, false); - - /* - * Get epoch and adjust if nextXid and oldestXmin are different - * sides of the epoch boundary. - */ - GetNextXidAndEpoch(&nextXid, &nextEpoch); - if (nextXid < reply_message.xmin) - nextEpoch--; - reply_message.epoch = nextEpoch; - } - else - { - reply_message.xmin = InvalidTransactionId; - reply_message.epoch = 0; - } - - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", reply_message.write.xlogid, reply_message.write.xrecoff, reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff, - reply_message.xmin, - reply_message.epoch); + reply_message.apply.xlogid, reply_message.apply.xrecoff); /* Prepend with the message type and send it. */ buf[0] = 'r'; memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); } + +/* + * Send hot standby feedback message to primary, plus the current time, + * in case they don't have a watch. + */ +static void +XLogWalRcvSendHSFeedback(void) +{ + char buf[sizeof(StandbyHSFeedbackMessage) + 1]; + TimestampTz now; + TransactionId nextXid; + uint32 nextEpoch; + TransactionId xmin; + + /* + * If the user doesn't want status to be reported to the master, be sure + * to exit before doing anything at all. + */ + if (!hot_standby_feedback) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + + /* + * Send feedback at most once per wal_receiver_status_interval. + */ + if (!TimestampDifferenceExceeds(feedback_message.sendTime, now, + wal_receiver_status_interval * 1000)) + return; + + /* + * If Hot Standby is not yet active there is nothing to send. + * Check this after the interval has expired to reduce number of + * calls. + */ + if (!HotStandbyActive()) + return; + + /* + * Make the expensive call to get the oldest xmin once we are + * certain everything else has been checked. + */ + xmin = GetOldestXmin(true, false); + + /* + * Get epoch and adjust if nextXid and oldestXmin are different + * sides of the epoch boundary. + */ + GetNextXidAndEpoch(&nextXid, &nextEpoch); + if (nextXid < xmin) + nextEpoch--; + + /* + * Always send feedback message. + */ + feedback_message.sendTime = now; + feedback_message.xmin = xmin; + feedback_message.epoch = nextEpoch; + + elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", + feedback_message.xmin, + feedback_message.epoch); + + /* Prepend with the message type and send it. */ + buf[0] = 'h'; + memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); + walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a6a7a1425b..e04d59e1e7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg); static bool XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); +static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); +static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); @@ -456,42 +458,45 @@ ProcessRepliesIfAny(void) unsigned char firstchar; int r; - r = pq_getbyte_if_available(&firstchar); - if (r < 0) + for (;;) { - /* unexpected error or EOF */ - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected EOF on standby connection"))); - proc_exit(0); - } - if (r == 0) - { - /* no data available without blocking */ - return; - } - - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { - /* - * 'd' means a standby reply wrapped in a CopyData packet. - */ - case 'd': - ProcessStandbyReplyMessage(); - break; - - /* - * 'X' means that the standby is closing down the socket. - */ - case 'X': - proc_exit(0); - - default: - ereport(FATAL, + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* unexpected error or EOF */ + ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby closing message type %d", - firstchar))); + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + if (r == 0) + { + /* no data available without blocking */ + return; + } + + /* Handle the very limited subset of commands expected in this phase */ + switch (firstchar) + { + /* + * 'd' means a standby reply wrapped in a CopyData packet. + */ + case 'd': + ProcessStandbyMessage(); + break; + + /* + * 'X' means that the standby is closing down the socket. + */ + case 'X': + proc_exit(0); + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby closing message type %d", + firstchar))); + } } } @@ -499,11 +504,9 @@ ProcessRepliesIfAny(void) * Process a status update message received from standby. */ static void -ProcessStandbyReplyMessage(void) +ProcessStandbyMessage(void) { - StandbyReplyMessage reply; char msgtype; - TransactionId newxmin = InvalidTransactionId; resetStringInfo(&reply_message); @@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void) * one type. */ msgtype = pq_getmsgbyte(&reply_message); - if (msgtype != 'r') + + switch (msgtype) { - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected message type %c", msgtype))); - proc_exit(0); + case 'r': + ProcessStandbyReplyMessage(); + break; + + case 'h': + ProcessStandbyHSFeedbackMessage(); + break; + + default: + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type %c", msgtype))); + proc_exit(0); } +} + +/* + * Regular reply from standby advising of WAL positions on standby server. + */ +static void +ProcessStandbyReplyMessage(void) +{ + StandbyReplyMessage reply; pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage)); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X", reply.write.xlogid, reply.write.xrecoff, reply.flush.xlogid, reply.flush.xrecoff, - reply.apply.xlogid, reply.apply.xrecoff, - reply.xmin, - reply.epoch); + reply.apply.xlogid, reply.apply.xrecoff); /* * Update shared state for this WalSender process @@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } +} + +/* + * Hot Standby feedback + */ +static void +ProcessStandbyHSFeedbackMessage(void) +{ + StandbyHSFeedbackMessage reply; + TransactionId newxmin = InvalidTransactionId; + + pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u", + reply.xmin, + reply.epoch); /* * Update the WalSender's proc xmin to allow it to be visible diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index da94b6b2f3..9baca948a2 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -56,6 +56,18 @@ typedef struct XLogRecPtr flush; XLogRecPtr apply; + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} StandbyReplyMessage; + +/* + * Hot Standby feedback from standby (message type 'h'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef struct +{ /* * The current xmin and epoch from the standby, for Hot Standby feedback. * This may be invalid if the standby-side does not support feedback, @@ -64,10 +76,9 @@ typedef struct TransactionId xmin; uint32 epoch; - /* Sender's system clock at the time of transmission */ TimestampTz sendTime; -} StandbyReplyMessage; +} StandbyHSFeedbackMessage; /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.