diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 70b2e1cbeb..c1c3709fb4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -407,7 +407,6 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ TimeLineID ThisTimeLineID; - TimeLineID RecoveryTargetTLI; /* * archiveCleanupCommand is read from recovery.conf but needs to be in @@ -456,14 +455,14 @@ typedef struct XLogCtlData XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* current effective recovery target timeline */ + TimeLineID RecoveryTargetTLI; /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery */ TimestampTz currentChunkStartTime; - /* end of the last record restored from the archive */ - XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ bool recoveryPause; @@ -2817,18 +2816,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, if (reload) WalSndRqstFileReload(); - /* - * Calculate the end location of the restored WAL file and save it in - * shmem. It's used as current standby flush position, and cascading - * walsenders try to send WAL records up to this location. - */ - XLogSegNoOffsetToRecPtr(segno, 0, endptr); - XLByteAdvance(endptr, XLogSegSize); - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->restoreLastRecPtr = endptr; - SpinLockRelease(&xlogctl->info_lck); - /* Signal walsender that new WAL has arrived */ if (AllowCascadeReplication()) WalSndWakeup(); @@ -4470,12 +4457,17 @@ rescanLatestTimeLine(void) ThisTimeLineID))); else { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + /* Switch target */ recoveryTargetTLI = newtarget; list_free(expectedTLIs); expectedTLIs = newExpectedTLIs; - XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); ereport(LOG, (errmsg("new target timeline is %u", @@ -7513,13 +7505,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) } /* - * GetRecoveryTargetTLI - get the recovery target timeline ID + * GetRecoveryTargetTLI - get the current recovery target timeline ID */ TimeLineID GetRecoveryTargetTLI(void) { - /* RecoveryTargetTLI doesn't change so we need no lock to copy it */ - return XLogCtl->RecoveryTargetTLI; + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + TimeLineID result; + + SpinLockAcquire(&xlogctl->info_lck); + result = xlogctl->RecoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); + + return result; } /* @@ -8309,7 +8308,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetStandbyFlushRecPtr(); + endptr = GetStandbyFlushRecPtr(NULL); KeepLogSeg(endptr, &_logSegNo); _logSegNo--; @@ -9818,14 +9817,13 @@ do_pg_abort_backup(void) /* * Get latest redo apply position. * - * Optionally, returns the end byte position of the last restored - * WAL segment. Callers not interested in that value may pass - * NULL for restoreLastRecPtr. + * Optionally, returns the current recovery target timeline. Callers not + * interested in that may pass NULL for targetTLI. * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) +GetXLogReplayRecPtr(TimeLineID *targetTLI) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -9833,8 +9831,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->recoveryLastRecPtr; - if (restoreLastRecPtr) - *restoreLastRecPtr = xlogctl->restoreLastRecPtr; + if (targetTLI) + *targetTLI = xlogctl->RecoveryTargetTLI; SpinLockRelease(&xlogctl->info_lck); return recptr; @@ -9843,21 +9841,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) /* * Get current standby flush position, ie, the last WAL position * known to be fsync'd to disk in standby. + * + * If 'targetTLI' is not NULL, it's set to the current recovery target + * timeline. */ XLogRecPtr -GetStandbyFlushRecPtr(void) +GetStandbyFlushRecPtr(TimeLineID *targetTLI) { XLogRecPtr receivePtr; XLogRecPtr replayPtr; - XLogRecPtr restorePtr; receivePtr = GetWalRcvWriteRecPtr(NULL); - replayPtr = GetXLogReplayRecPtr(&restorePtr); + replayPtr = GetXLogReplayRecPtr(targetTLI); if (XLByteLT(receivePtr, replayPtr)) - return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; + return replayPtr; else - return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; + return receivePtr; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 38f7a3f1c3..cc27848318 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -303,7 +303,7 @@ IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); + logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); @@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ - SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); + if (am_cascading_walsender) + { + TimeLineID currentTargetTLI; + SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI); + + /* + * If the recovery target timeline changed, bail out. It's a bit + * unfortunate that we have to just disconnect, but there is no way + * to tell the client that the timeline changed. We also don't know + * exactly where the switch happened, so we cannot safely try to send + * up to the switchover point before disconnecting. + */ + if (currentTargetTLI != ThisTimeLineID) + { + if (!walsender_ready_to_stop) + ereport(LOG, + (errmsg("terminating walsender process to force cascaded standby " + "to update timeline and reconnect"))); + walsender_ready_to_stop = true; + *caughtup = true; + return; + } + } + else + SendRqstPtr = GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ec79870e74..2893f3b352 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -285,8 +285,8 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); -extern XLogRecPtr GetStandbyFlushRecPtr(void); +extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void);