diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 29a25eb903..74357ace29 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -169,7 +169,7 @@ static void WalSndLoop(void); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(bool *caughtup); -static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI); +static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); @@ -250,7 +250,7 @@ IdentifySystem(void) if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ - logptr = GetStandbyFlushRecPtr(0); + logptr = GetStandbyFlushRecPtr(); } else logptr = GetInsertRecPtr(); @@ -423,7 +423,7 @@ StartReplication(StartReplicationCmd *cmd) if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(0); + FlushPtr = GetStandbyFlushRecPtr(); } else FlushPtr = GetFlushRecPtr(); @@ -1310,7 +1310,6 @@ static void XLogSend(bool *caughtup) { XLogRecPtr SendRqstPtr; - XLogRecPtr FlushPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; @@ -1321,33 +1320,39 @@ XLogSend(bool *caughtup) return; } - /* - * Attempt to send all data that's already been written out and fsync'd to - * disk. We cannot go further than what's been written out given the - * current implementation of XLogRead(). And in any case it's unsafe to - * send WAL that is not securely down to disk on the master: if the master - * subsequently crashes and restarts, slaves must not have applied any WAL - * that gets lost on the master. - */ - if (am_cascading_walsender) - FlushPtr = GetStandbyFlushRecPtr(sendTimeLine); - else - FlushPtr = GetFlushRecPtr(); - - /* - * In a cascading standby, the current recovery target timeline can - * change, or we can be promoted. In either case, the current timeline - * becomes historic. We need to detect that so that we don't try to stream - * past the point where we switched to another timeline. It's checked - * after calculating FlushPtr, to avoid a race condition: if the timeline - * becomes historic just after we checked that it was still current, it - * should still be OK to stream it up to the FlushPtr that was calculated - * before it became historic. - */ - if (!sendTimeLineIsHistoric && am_cascading_walsender) + /* Figure out how far we can safely send the WAL. */ + if (sendTimeLineIsHistoric) { + /* + * Streaming an old timeline timeline that's in this server's history, + * but is not the one we're currently inserting or replaying. It can + * be streamed up to the point where we switched off that timeline. + */ + SendRqstPtr = sendTimeLineValidUpto; + } + else if (am_cascading_walsender) + { + /* + * Streaming the latest timeline on a standby. + * + * Attempt to send all WAL that has already been replayed, so that + * we know it's valid. If we're receiving WAL through streaming + * replication, it's also OK to send any WAL that has been received + * but not replayed. + * + * The timeline we're recovering from can change, or we can be + * promoted. In either case, the current timeline becomes historic. + * We need to detect that so that we don't try to stream past the + * point where we switched to another timeline. We check for promotion + * or timeline switch after calculating FlushPtr, to avoid a race + * condition: if the timeline becomes historic just after we checked + * that it was still current, it's still be OK to stream it up to the + * FlushPtr that was calculated before it became historic. + */ bool becameHistoric = false; + SendRqstPtr = GetStandbyFlushRecPtr(); + if (!RecoveryInProgress()) { /* @@ -1361,7 +1366,8 @@ XLogSend(bool *caughtup) { /* * Still a cascading standby. But is the timeline we're sending - * still the one recovery is recovering from? + * still the one recovery is recovering from? ThisTimeLineID was + * updated by the GetStandbyFlushRecPtr() call above. */ if (sendTimeLine != ThisTimeLineID) becameHistoric = true; @@ -1391,8 +1397,24 @@ XLogSend(bool *caughtup) (uint32) sentPtr); sendTimeLineIsHistoric = true; + + SendRqstPtr = sendTimeLineValidUpto; } } + else + { + /* + * Streaming the current timeline on a master. + * + * Attempt to send all data that's already been written out and + * fsync'd to disk. We cannot go further than what's been written out + * given the current implementation of XLogRead(). And in any case + * it's unsafe to send WAL that is not securely down to disk on the + * master: if the master subsequently crashes and restarts, slaves + * must not have applied any WAL that gets lost on the master. + */ + SendRqstPtr = GetFlushRecPtr(); + } /* * If this is a historic timeline and we've reached the point where we @@ -1413,15 +1435,7 @@ XLogSend(bool *caughtup) return; } - /* - * Stream up to the point known to be flushed to disk, or to the end of - * this timeline, whichever comes first. - */ - if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr)) - SendRqstPtr = sendTimeLineValidUpto; - else - SendRqstPtr = FlushPtr; - + /* Do we have any work to do? */ Assert(XLByteLE(sentPtr, SendRqstPtr)); if (XLByteLE(SendRqstPtr, sentPtr)) { @@ -1522,15 +1536,11 @@ XLogSend(bool *caughtup) * can be sent to the standby. This should only be called when in recovery, * ie. we're streaming to a cascaded standby. * - * If currentTLI is non-zero, the function returns the point that the WAL on - * the given timeline has been flushed upto. If recovery has already switched - * to a different timeline, InvalidXLogRecPtr is returned. - * * As a side-effect, ThisTimeLineID is updated to the TLI of the last * replayed WAL record. */ static XLogRecPtr -GetStandbyFlushRecPtr(TimeLineID currentTLI) +GetStandbyFlushRecPtr(void) { XLogRecPtr replayPtr; TimeLineID replayTLI; @@ -1549,11 +1559,8 @@ GetStandbyFlushRecPtr(TimeLineID currentTLI) ThisTimeLineID = replayTLI; - if (currentTLI != replayTLI && currentTLI != 0) - return InvalidXLogRecPtr; - result = replayPtr; - if (receiveTLI == currentTLI && receivePtr > replayPtr) + if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) result = receivePtr; return result;