diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f6e7fa71d8..ef4b5f639c 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1373,11 +1373,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) * twophase files and ReadTwoPhaseFile should be used instead. * * Note clearly that this function can access WAL during normal operation, - * similarly to the way WALSender or Logical Decoding would do. While - * accessing WAL, read_local_xlog_page() may change ThisTimeLineID, - * particularly if this routine is called for the end-of-recovery checkpoint - * in the checkpointer itself, so save the current timeline number value - * and restore it once done. + * similarly to the way WALSender or Logical Decoding would do. */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1385,7 +1381,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - TimeLineID save_currtli = ThisTimeLineID; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.page_read = &read_local_xlog_page, @@ -1401,13 +1396,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogBeginRead(xlogreader, lsn); record = XLogReadRecord(xlogreader, &errormsg); - /* - * Restore immediately the timeline where it was previously, as - * read_local_xlog_page() could have changed it if the record was read - * while recovery was finishing or if the timeline has jumped in-between. - */ - ThisTimeLineID = save_currtli; - if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0a0771a18e..9b15735921 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -192,7 +192,7 @@ CheckpointStatsData CheckpointStats; * ThisTimeLineID will be same in all backends --- it identifies current * WAL timeline for the database system. */ -TimeLineID ThisTimeLineID = 0; +static TimeLineID ThisTimeLineID = 0; static XLogRecPtr LastRec; @@ -917,7 +917,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, - bool find_free, XLogSegNo max_segno); + bool find_free, XLogSegNo max_segno, + TimeLineID tli); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); @@ -2518,7 +2519,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) wal_segment_size); /* create/use new log file */ - openLogFile = XLogFileInit(openLogSegNo); + openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID); ReserveExternalFD(); } @@ -2633,7 +2634,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) */ if (finishing_seg) { - issue_xlog_fsync(openLogFile, openLogSegNo); + issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID); /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); @@ -2641,7 +2642,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); + XLogArchiveNotifySeg(openLogSegNo, ThisTimeLineID); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2704,7 +2705,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ReserveExternalFD(); } - issue_xlog_fsync(openLogFile, openLogSegNo); + issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID); } /* signal that we need to wakeup walsenders later */ @@ -3296,7 +3297,8 @@ XLogNeedsFlush(XLogRecPtr record) * succeed. (This is weird, but it's efficient for the callers.) */ static int -XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) +XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli, + bool *added, char *path) { char tmppath[MAXPGPATH]; PGAlignedXLogBlock zbuffer; @@ -3305,7 +3307,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) int fd; int save_errno; - XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size); + Assert(logtli != 0); + + XLogFilePath(path, logtli, logsegno, wal_segment_size); /* * Try to use existent file (checkpoint maker may have created it already) @@ -3449,7 +3453,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) * CheckPointSegments. */ max_segno = logsegno + CheckPointSegments; - if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno)) + if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno, + logtli)) { *added = true; elog(DEBUG2, "done creating and filling new WAL file"); @@ -3481,13 +3486,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) * in a critical section. */ int -XLogFileInit(XLogSegNo logsegno) +XLogFileInit(XLogSegNo logsegno, TimeLineID logtli) { bool ignore_added; char path[MAXPGPATH]; int fd; - fd = XLogFileInitInternal(logsegno, &ignore_added, path); + Assert(logtli != 0); + + fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path); if (fd >= 0) return fd; @@ -3629,7 +3636,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, /* * Now move the segment into place with its final name. */ - if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0)) + if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID)) elog(ERROR, "InstallXLogFileSegment should not have failed"); } @@ -3653,18 +3660,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, * free slot is found between *segno and max_segno. (Ignored when find_free * is false.) * + * tli: The timeline on which the new segment should be installed. + * * Returns true if the file was installed successfully. false indicates that * max_segno limit was exceeded, the startup process has disabled this * function for now, or an error occurred while renaming the file into place. */ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, - bool find_free, XLogSegNo max_segno) + bool find_free, XLogSegNo max_segno, TimeLineID tli) { char path[MAXPGPATH]; struct stat stat_buf; - XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size); + Assert(tli != 0); + + XLogFilePath(path, tli, *segno, wal_segment_size); LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (!XLogCtl->InstallXLogFileSegmentActive) @@ -3690,7 +3701,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, return false; } (*segno)++; - XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size); + XLogFilePath(path, tli, *segno, wal_segment_size); } } @@ -3987,7 +3998,7 @@ PreallocXlogFiles(XLogRecPtr endptr) if (offset >= (uint32) (0.75 * wal_segment_size)) { _logSegNo++; - lf = XLogFileInitInternal(_logSegNo, &added, path); + lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path); if (lf >= 0) close(lf); if (added) @@ -4266,7 +4277,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo, XLogCtl->InstallXLogFileSegmentActive && /* callee rechecks this */ lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && InstallXLogFileSegment(endlogSegNo, path, - true, recycleSegNo)) + true, recycleSegNo, ThisTimeLineID)) { ereport(DEBUG2, (errmsg_internal("recycled write-ahead log file \"%s\"", @@ -5401,7 +5412,7 @@ BootStrapXLOG(void) record->xl_crc = crc; /* Create first XLOG segment file */ - openLogFile = XLogFileInit(1); + openLogFile = XLogFileInit(1, ThisTimeLineID); /* * We needn't bother with Reserve/ReleaseExternalFD here, since we'll @@ -5709,7 +5720,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) */ int fd; - fd = XLogFileInit(startLogSegNo); + fd = XLogFileInit(startLogSegNo, ThisTimeLineID); if (close(fd) != 0) { @@ -8706,15 +8717,35 @@ GetInsertRecPtr(void) * position known to be fsync'd to disk. */ XLogRecPtr -GetFlushRecPtr(void) +GetFlushRecPtr(TimeLineID *insertTLI) { SpinLockAcquire(&XLogCtl->info_lck); LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + /* + * If we're writing and flushing WAL, the time line can't be changing, + * so no lock is required. + */ + if (insertTLI) + *insertTLI = XLogCtl->ThisTimeLineID; + return LogwrtResult.Flush; } +/* + * GetWALInsertionTimeLine -- Returns the current timeline of a system that + * is not in recovery. + */ +TimeLineID +GetWALInsertionTimeLine(void) +{ + Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); + + /* Since the value can't be changing, no lock is required. */ + return XLogCtl->ThisTimeLineID; +} + /* * GetLastImportantRecPtr -- Returns the LSN of the last important record * inserted. All records not explicitly marked as unimportant are considered @@ -10849,11 +10880,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra) * 'segno' is for error reporting purposes. */ void -issue_xlog_fsync(int fd, XLogSegNo segno) +issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli) { char *msg = NULL; instr_time start; + Assert(tli != 0); + /* * Quick exit if fsync is disabled or write() has already synced the WAL * file. @@ -10902,8 +10935,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno) char xlogfname[MAXFNAMELEN]; int save_errno = errno; - XLogFileName(xlogfname, ThisTimeLineID, segno, - wal_segment_size); + XLogFileName(xlogfname, tli, segno, wal_segment_size); errno = save_errno; ereport(PANIC, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index 26b023e754..7d56dad0de 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -498,11 +498,13 @@ XLogArchiveNotify(const char *xlog) * Convenience routine to notify using segment number representation of filename */ void -XLogArchiveNotifySeg(XLogSegNo segno) +XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli) { char xlog[MAXFNAMELEN]; - XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size); + Assert(tli != 0); + + XLogFileName(xlog, tli, segno, wal_segment_size); XLogArchiveNotify(xlog); } diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index b98deb72ec..dd9a45c186 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -382,7 +382,7 @@ pg_current_wal_flush_lsn(PG_FUNCTION_ARGS) errmsg("recovery is in progress"), errhint("WAL control functions cannot be executed during recovery."))); - current_recptr = GetFlushRecPtr(); + current_recptr = GetFlushRecPtr(NULL); PG_RETURN_LSN(current_recptr); } @@ -469,7 +469,8 @@ pg_walfile_name_offset(PG_FUNCTION_ARGS) * xlogfilename */ XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size); - XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size); + XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno, + wal_segment_size); values[0] = CStringGetTextDatum(xlogfilename); isnull[0] = false; @@ -511,7 +512,8 @@ pg_walfile_name(PG_FUNCTION_ARGS) "pg_walfile_name()"))); XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size); - XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size); + XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno, + wal_segment_size); PG_RETURN_TEXT_P(cstring_to_text(xlogfilename)); } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 88a1bfd939..b33e0531ed 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -678,6 +678,10 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * wantLength to the amount of the page that will be read, up to * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. * + * The currTLI argument should be the system-wide current timeline. + * Note that this may be different from state->currTLI, which is the timeline + * from which the caller is currently reading previous xlog records. + * * We switch to an xlog segment from the new timeline eagerly when on a * historical timeline, as soon as we reach the start of the xlog segment * containing the timeline switch. The server copied the segment to the new @@ -699,12 +703,11 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * * The caller must also make sure it doesn't read past the current replay * position (using GetXLogReplayRecPtr) if executing in recovery, so it - * doesn't fail to notice that the current timeline became historical. The - * caller must also update ThisTimeLineID with the result of - * GetXLogReplayRecPtr and must check RecoveryInProgress(). + * doesn't fail to notice that the current timeline became historical. */ void -XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, + uint32 wantLength, TimeLineID currTLI) { const XLogRecPtr lastReadPage = (state->seg.ws_segno * state->segcxt.ws_segsize + state->segoff); @@ -712,6 +715,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + Assert(currTLI != 0); /* * If the desired page is currently read in and valid, we have nothing to @@ -732,12 +736,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * just carry on. (Seeking backwards requires a check to make sure the * older page isn't on a prior timeline). * - * ThisTimeLineID might've become historical since we last looked, but the - * caller is required not to read past the flush limit it saw at the time - * it looked up the timeline. There's nothing we can do about it if - * StartupXLOG() renames it to .partial concurrently. + * currTLI might've become historical since the caller obtained the value, + * but the caller is required not to read past the flush limit it saw at + * the time it looked up the timeline. There's nothing we can do about it + * if StartupXLOG() renames it to .partial concurrently. */ - if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + if (state->currTLI == currTLI && wantPage >= lastReadPage) { Assert(state->currTLIValidUntil == InvalidXLogRecPtr); return; @@ -749,7 +753,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * the current segment we can just keep reading. */ if (state->currTLIValidUntil != InvalidXLogRecPtr && - state->currTLI != ThisTimeLineID && + state->currTLI != currTLI && state->currTLI != 0 && ((wantPage + wantLength) / state->segcxt.ws_segsize) < (state->currTLIValidUntil / state->segcxt.ws_segsize)) @@ -772,7 +776,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * We need to re-read the timeline history in case it's been changed * by a promotion or replay from a cascaded replica. */ - List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + List *timelineHistory = readTimeLineHistory(currTLI); XLogRecPtr endOfSegment; endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) * @@ -853,6 +857,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + TimeLineID currTLI; loc = targetPagePtr + reqLen; @@ -862,16 +867,12 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, /* * Determine the limit of xlog we can currently read to, and what the * most recent timeline is. - * - * RecoveryInProgress() will update ThisTimeLineID when it first - * notices recovery finishes, so we only have to maintain it for the - * local process until recovery ends. */ if (!RecoveryInProgress()) - read_upto = GetFlushRecPtr(); + read_upto = GetFlushRecPtr(&currTLI); else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - tli = ThisTimeLineID; + read_upto = GetXLogReplayRecPtr(&currTLI); + tli = currTLI; /* * Check which timeline to get the record from. @@ -890,16 +891,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * archive in the timeline will get renamed to .partial by * StartupXLOG(). * - * If that happens after our caller updated ThisTimeLineID but before + * If that happens after our caller determined the TLI but before * we actually read the xlog page, we might still try to read from the * old (now renamed) segment and fail. There's not much we can do * about this, but it can only happen when we're a leaf of a cascading * standby whose primary gets promoted while we're decoding, so a * one-off ERROR isn't too bad. */ - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli); - if (state->currTLI == ThisTimeLineID) + if (state->currTLI == currTLI) { if (loc <= read_upto) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 5a7fae4a87..2609a0a710 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -211,7 +211,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * Compute the current end-of-wal. */ if (!RecoveryInProgress()) - end_of_wal = GetFlushRecPtr(); + end_of_wal = GetFlushRecPtr(NULL); else end_of_wal = GetXLogReplayRecPtr(NULL); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8d96c926b4..0bd5d0ee5e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2435,7 +2435,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes) { dlist_mutable_iter iter; - XLogRecPtr local_flush = GetFlushRecPtr(); + XLogRecPtr local_flush = GetFlushRecPtr(NULL); *write = InvalidXLogRecPtr; *flush = InvalidXLogRecPtr; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 877a006d50..a80298ba53 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -625,7 +625,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) * target position accordingly. */ if (!RecoveryInProgress()) - moveto = Min(moveto, GetFlushRecPtr()); + moveto = Min(moveto, GetFlushRecPtr(NULL)); else moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b90e5ca98e..7a7eb3784e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -122,10 +122,12 @@ static StringInfoData incoming_message; static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); -static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); -static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvClose(XLogRecPtr recptr); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, + TimeLineID tli); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, + TimeLineID tli); +static void XLogWalRcvFlush(bool dying, TimeLineID tli); +static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -255,7 +257,7 @@ WalReceiverMain(void) pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvDie, 0); + on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI)); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config @@ -394,7 +396,6 @@ WalReceiverMain(void) options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; options.proto.physical.startpointTLI = startpointTLI; - ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) @@ -462,7 +463,8 @@ WalReceiverMain(void) */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; - XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); + XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, + startpointTLI); } else if (len == 0) break; @@ -487,7 +489,7 @@ WalReceiverMain(void) * let the startup process and primary server know about * them. */ - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, startpointTLI); } /* Check if we need to exit the streaming loop. */ @@ -608,7 +610,7 @@ WalReceiverMain(void) { char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, startpointTLI); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, @@ -776,9 +778,12 @@ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; + TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg); + + Assert(*startpointTLI_p != 0); /* Ensure that all WAL records received are flushed to disk */ - XLogWalRcvFlush(true); + XLogWalRcvFlush(true, *startpointTLI_p); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); @@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg) * Accept the message from XLOG stream, and process it. */ static void -XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) { int hdrlen; XLogRecPtr dataStart; @@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart); + XLogWalRcvWrite(buf, len, dataStart, tli); break; } case 'k': /* Keepalive */ @@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) * Write XLOG data to disk. */ static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) { int startoff; int byteswritten; + Assert(tli != 0); + while (nbytes > 0) { int segbytes; /* Close the current segment if it's completed */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr); + XLogWalRcvClose(recptr, tli); if (recvFile < 0) { /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); - recvFile = XLogFileInit(recvSegNo); - recvFileTLI = ThisTimeLineID; + recvFile = XLogFileInit(recvSegNo, tli); + recvFileTLI = tli; } /* Calculate the start offset of the received logs */ @@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * segment is received and written. */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr); + XLogWalRcvClose(recptr, tli); } /* @@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * an error, so we skip sending a reply in that case. */ static void -XLogWalRcvFlush(bool dying) +XLogWalRcvFlush(bool dying, TimeLineID tli) { + Assert(tli != 0); + if (LogstreamResult.Flush < LogstreamResult.Write) { WalRcvData *walrcv = WalRcv; - issue_xlog_fsync(recvFile, recvSegNo); + issue_xlog_fsync(recvFile, recvSegNo, tli); LogstreamResult.Flush = LogstreamResult.Write; @@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying) { walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; - walrcv->receivedTLI = ThisTimeLineID; + walrcv->receivedTLI = tli; } SpinLockRelease(&walrcv->mutex); @@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying) * Create an archive notification file since the segment is known completed. */ static void -XLogWalRcvClose(XLogRecPtr recptr) +XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) { char xlogfname[MAXFNAMELEN]; Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); + Assert(tli != 0); /* * fsync() and close current file before we switch to next one. We would * otherwise have to reopen this file to fsync it later */ - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, tli); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d9ab6d6de2..fff7dfc640 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); -static XLogRecPtr GetStandbyFlushRecPtr(void); +static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); static void IdentifySystem(void); static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); @@ -385,6 +385,7 @@ IdentifySystem(void) TupleDesc tupdesc; Datum values[4]; bool nulls[4]; + TimeLineID currTLI; /* * Reply with a result set with one row, four columns. First col is system @@ -397,12 +398,9 @@ IdentifySystem(void) am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - logptr = GetStandbyFlushRecPtr(); - } + logptr = GetStandbyFlushRecPtr(&currTLI); else - logptr = GetFlushRecPtr(); + logptr = GetFlushRecPtr(&currTLI); snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr)); @@ -441,7 +439,7 @@ IdentifySystem(void) values[0] = CStringGetTextDatum(sysid); /* column 2: timeline */ - values[1] = Int32GetDatum(ThisTimeLineID); + values[1] = Int32GetDatum(currTLI); /* column 3: wal location */ values[2] = CStringGetTextDatum(xloc); @@ -537,7 +535,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd) if (RecoveryInProgress()) (void) GetXLogReplayRecPtr(¤t_timeline); else - current_timeline = ThisTimeLineID; + current_timeline = GetWALInsertionTimeLine(); timeline_history = readTimeLineHistory(current_timeline); slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, @@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; XLogRecPtr FlushPtr; + TimeLineID FlushTLI; /* create xlogreader for physical replication */ xlogreader = @@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd) /* * Select the timeline. If it was given explicitly by the client, use - * that. Otherwise use the timeline of the last replayed record, which is - * kept in ThisTimeLineID. + * that. Otherwise use the timeline of the last replayed record. */ am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(); - } + FlushPtr = GetStandbyFlushRecPtr(&FlushTLI); else - FlushPtr = GetFlushRecPtr(); + FlushPtr = GetFlushRecPtr(&FlushTLI); if (cmd->timeline != 0) { XLogRecPtr switchpoint; sendTimeLine = cmd->timeline; - if (sendTimeLine == ThisTimeLineID) + if (sendTimeLine == FlushTLI) { sendTimeLineIsHistoric = false; sendTimeLineValidUpto = InvalidXLogRecPtr; @@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd) * Check that the timeline the client requested exists, and the * requested start location is on that timeline. */ - timeLineHistory = readTimeLineHistory(ThisTimeLineID); + timeLineHistory = readTimeLineHistory(FlushTLI); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, &sendTimeLineNextTLI); list_free_deep(timeLineHistory); @@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd) } else { - sendTimeLine = ThisTimeLineID; + sendTimeLine = FlushTLI; sendTimeLineValidUpto = InvalidXLogRecPtr; sendTimeLineIsHistoric = false; } @@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; + TimeLineID currTLI = GetWALInsertionTimeLine(); - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); - sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); + /* + * Since logical decoding is only permitted on a primary server, we know + * that the current timeline ID can't be changing any more. If we did this + * on a standby, we'd have to worry about the values we compute here + * becoming invalid due to a promotion or timeline change. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); + sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; @@ -1487,7 +1489,7 @@ WalSndWaitForWal(XLogRecPtr loc) /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) - RecentFlushPtr = GetFlushRecPtr(); + RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -1521,7 +1523,7 @@ WalSndWaitForWal(XLogRecPtr loc) /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) - RecentFlushPtr = GetFlushRecPtr(); + RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -2683,6 +2685,8 @@ XLogSendPhysical(void) } else if (am_cascading_walsender) { + TimeLineID SendRqstTLI; + /* * Streaming the latest timeline on a standby. * @@ -2702,14 +2706,12 @@ XLogSendPhysical(void) */ bool becameHistoric = false; - SendRqstPtr = GetStandbyFlushRecPtr(); + SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI); if (!RecoveryInProgress()) { - /* - * We have been promoted. RecoveryInProgress() updated - * ThisTimeLineID to the new current timeline. - */ + /* We have been promoted. */ + SendRqstTLI = GetWALInsertionTimeLine(); am_cascading_walsender = false; becameHistoric = true; } @@ -2717,10 +2719,9 @@ XLogSendPhysical(void) { /* * Still a cascading standby. But is the timeline we're sending - * still the one recovery is recovering from? ThisTimeLineID was - * updated by the GetStandbyFlushRecPtr() call above. + * still the one recovery is recovering from? */ - if (sendTimeLine != ThisTimeLineID) + if (sendTimeLine != SendRqstTLI) becameHistoric = true; } @@ -2733,7 +2734,7 @@ XLogSendPhysical(void) */ List *history; - history = readTimeLineHistory(ThisTimeLineID); + history = readTimeLineHistory(SendRqstTLI); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sendTimeLine < sendTimeLineNextTLI); @@ -2756,7 +2757,7 @@ XLogSendPhysical(void) * primary: if the primary subsequently crashes and restarts, standbys * must not have applied any WAL that got lost on the primary. */ - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = GetFlushRecPtr(NULL); } /* @@ -2997,9 +2998,9 @@ XLogSendLogical(void) * we only need to update flushPtr if EndRecPtr is past it. */ if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(); + flushPtr = GetFlushRecPtr(NULL); else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(); + flushPtr = GetFlushRecPtr(NULL); /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data) * can be sent to the standby. This should only be called when in recovery, * ie. we're streaming to a cascaded standby. * - * As a side-effect, ThisTimeLineID is updated to the TLI of the last + * As a side-effect, *tli is updated to the TLI of the last * replayed WAL record. */ static XLogRecPtr -GetStandbyFlushRecPtr(void) +GetStandbyFlushRecPtr(TimeLineID *tli) { XLogRecPtr replayPtr; TimeLineID replayTLI; @@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - ThisTimeLineID = replayTLI; + *tli = replayTLI; result = replayPtr; - if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) + if (receiveTLI == replayTLI && receivePtr > replayPtr) result = receivePtr; return result; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c0a560204b..f188c41bed 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -29,8 +29,6 @@ #define SYNC_METHOD_OPEN_DSYNC 4 /* for O_DSYNC */ extern int sync_method; -extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */ - /* * Recovery target type. * Only set during a Point in Time recovery, not when in standby mode. @@ -262,7 +260,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); -extern int XLogFileInit(XLogSegNo segno); +extern int XLogFileInit(XLogSegNo segno, TimeLineID tli); extern int XLogFileOpen(XLogSegNo segno); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); @@ -274,7 +272,7 @@ extern void xlog_redo(XLogReaderState *record); extern void xlog_desc(StringInfo buf, XLogReaderState *record); extern const char *xlog_identify(uint8 info); -extern void issue_xlog_fsync(int fd, XLogSegNo segno); +extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli); extern bool RecoveryInProgress(void); extern RecoveryState GetRecoveryState(void); @@ -312,7 +310,8 @@ extern void UpdateFullPageWrites(void); extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p); extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); -extern XLogRecPtr GetFlushRecPtr(void); +extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI); +extern TimeLineID GetWALInsertionTimeLine(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void RemovePromoteSignalFiles(void); diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h index 3edd1a976c..7dcf1bd2dd 100644 --- a/src/include/access/xlogarchive.h +++ b/src/include/access/xlogarchive.h @@ -24,7 +24,7 @@ extern void ExecuteRecoveryCommand(const char *command, const char *commandName, bool failOnSignal); extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname); extern void XLogArchiveNotify(const char *xlog); -extern void XLogArchiveNotifySeg(XLogSegNo segno); +extern void XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli); extern void XLogArchiveForceDone(const char *xlog); extern bool XLogArchiveCheckDone(const char *xlog); extern bool XLogArchiveIsBusy(const char *xlog); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index a5cb3d322c..eebc91f3a5 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -98,7 +98,9 @@ extern void wal_segment_open(XLogReaderState *state, extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, - XLogRecPtr wantPage, uint32 wantLength); + XLogRecPtr wantPage, + uint32 wantLength, + TimeLineID currTLI); extern void WALReadRaiseError(WALReadError *errinfo);