Remove all use of ThisTimeLineID global variable outside of xlog.c

All such code deals with this global variable in one of three ways.
Sometimes the same functions use it in more than one of these ways
at the same time.

First, sometimes it's an implicit argument to one or more functions
being called in xlog.c or elsewhere, and must be set to the
appropriate value before calling those functions lest they
misbehave. In those cases, it is now passed as an explicit argument
instead.

Second, sometimes it's used to obtain the current timeline after
the end of recovery, i.e. the timeline to which WAL is being
written and flushed. Such code now calls GetWALInsertionTimeLine()
or relies on the new out parameter added to GetFlushRecPtr().

Third, sometimes it's used during recovery to store the current
replay timeline. That can change, so such code must generally
update the value before each use. It can still do that, but must
now use a local variable instead.

The net effect of these changes is to reduce by a fair amount the
amount of code that is directly accessing this global variable.
That's good, because history has shown that we don't always think
clearly about which timeline ID it's supposed to contain at any
given point in time, or indeed, whether it has been or needs to
be initialized at any given point in the code.

Patch by me, reviewed and tested by Michael Paquier, Amul Sul, and
Álvaro Herrera.

Discussion: https://postgr.es/m/CA+TgmobfAAqhfWa1kaFBBFvX+5CjM=7TE=n4r4Q1o2bjbGYBpA@mail.gmail.com
This commit is contained in:
Robert Haas 2021-11-05 12:50:01 -04:00
parent caf1f675b8
commit e997a0c642
13 changed files with 169 additions and 132 deletions

View File

@ -1373,11 +1373,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
* twophase files and ReadTwoPhaseFile should be used instead.
*
* Note clearly that this function can access WAL during normal operation,
* similarly to the way WALSender or Logical Decoding would do. While
* accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
* particularly if this routine is called for the end-of-recovery checkpoint
* in the checkpointer itself, so save the current timeline number value
* and restore it once done.
* similarly to the way WALSender or Logical Decoding would do.
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@ -1385,7 +1381,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
TimeLineID save_currtli = ThisTimeLineID;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &read_local_xlog_page,
@ -1401,13 +1396,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogBeginRead(xlogreader, lsn);
record = XLogReadRecord(xlogreader, &errormsg);
/*
* Restore immediately the timeline where it was previously, as
* read_local_xlog_page() could have changed it if the record was read
* while recovery was finishing or if the timeline has jumped in-between.
*/
ThisTimeLineID = save_currtli;
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),

View File

@ -192,7 +192,7 @@ CheckpointStatsData CheckpointStats;
* ThisTimeLineID will be same in all backends --- it identifies current
* WAL timeline for the database system.
*/
TimeLineID ThisTimeLineID = 0;
static TimeLineID ThisTimeLineID = 0;
static XLogRecPtr LastRec;
@ -917,7 +917,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool find_free, XLogSegNo max_segno);
bool find_free, XLogSegNo max_segno,
TimeLineID tli);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
@ -2518,7 +2519,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
wal_segment_size);
/* create/use new log file */
openLogFile = XLogFileInit(openLogSegNo);
openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID);
ReserveExternalFD();
}
@ -2633,7 +2634,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
*/
if (finishing_seg)
{
issue_xlog_fsync(openLogFile, openLogSegNo);
issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
/* signal that we need to wakeup walsenders later */
WalSndWakeupRequest();
@ -2641,7 +2642,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogSegNo);
XLogArchiveNotifySeg(openLogSegNo, ThisTimeLineID);
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@ -2704,7 +2705,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
ReserveExternalFD();
}
issue_xlog_fsync(openLogFile, openLogSegNo);
issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID);
}
/* signal that we need to wakeup walsenders later */
@ -3296,7 +3297,8 @@ XLogNeedsFlush(XLogRecPtr record)
* succeed. (This is weird, but it's efficient for the callers.)
*/
static int
XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli,
bool *added, char *path)
{
char tmppath[MAXPGPATH];
PGAlignedXLogBlock zbuffer;
@ -3305,7 +3307,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
int fd;
int save_errno;
XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
Assert(logtli != 0);
XLogFilePath(path, logtli, logsegno, wal_segment_size);
/*
* Try to use existent file (checkpoint maker may have created it already)
@ -3449,7 +3453,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
* CheckPointSegments.
*/
max_segno = logsegno + CheckPointSegments;
if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno))
if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno,
logtli))
{
*added = true;
elog(DEBUG2, "done creating and filling new WAL file");
@ -3481,13 +3486,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path)
* in a critical section.
*/
int
XLogFileInit(XLogSegNo logsegno)
XLogFileInit(XLogSegNo logsegno, TimeLineID logtli)
{
bool ignore_added;
char path[MAXPGPATH];
int fd;
fd = XLogFileInitInternal(logsegno, &ignore_added, path);
Assert(logtli != 0);
fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path);
if (fd >= 0)
return fd;
@ -3629,7 +3636,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
/*
* Now move the segment into place with its final name.
*/
if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0))
if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID))
elog(ERROR, "InstallXLogFileSegment should not have failed");
}
@ -3653,18 +3660,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
* free slot is found between *segno and max_segno. (Ignored when find_free
* is false.)
*
* tli: The timeline on which the new segment should be installed.
*
* Returns true if the file was installed successfully. false indicates that
* max_segno limit was exceeded, the startup process has disabled this
* function for now, or an error occurred while renaming the file into place.
*/
static bool
InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool find_free, XLogSegNo max_segno)
bool find_free, XLogSegNo max_segno, TimeLineID tli)
{
char path[MAXPGPATH];
struct stat stat_buf;
XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
Assert(tli != 0);
XLogFilePath(path, tli, *segno, wal_segment_size);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (!XLogCtl->InstallXLogFileSegmentActive)
@ -3690,7 +3701,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
return false;
}
(*segno)++;
XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size);
XLogFilePath(path, tli, *segno, wal_segment_size);
}
}
@ -3987,7 +3998,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
if (offset >= (uint32) (0.75 * wal_segment_size))
{
_logSegNo++;
lf = XLogFileInitInternal(_logSegNo, &added, path);
lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path);
if (lf >= 0)
close(lf);
if (added)
@ -4266,7 +4277,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
XLogCtl->InstallXLogFileSegmentActive && /* callee rechecks this */
lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
InstallXLogFileSegment(endlogSegNo, path,
true, recycleSegNo))
true, recycleSegNo, ThisTimeLineID))
{
ereport(DEBUG2,
(errmsg_internal("recycled write-ahead log file \"%s\"",
@ -5401,7 +5412,7 @@ BootStrapXLOG(void)
record->xl_crc = crc;
/* Create first XLOG segment file */
openLogFile = XLogFileInit(1);
openLogFile = XLogFileInit(1, ThisTimeLineID);
/*
* We needn't bother with Reserve/ReleaseExternalFD here, since we'll
@ -5709,7 +5720,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
*/
int fd;
fd = XLogFileInit(startLogSegNo);
fd = XLogFileInit(startLogSegNo, ThisTimeLineID);
if (close(fd) != 0)
{
@ -8706,15 +8717,35 @@ GetInsertRecPtr(void)
* position known to be fsync'd to disk.
*/
XLogRecPtr
GetFlushRecPtr(void)
GetFlushRecPtr(TimeLineID *insertTLI)
{
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
/*
* If we're writing and flushing WAL, the time line can't be changing,
* so no lock is required.
*/
if (insertTLI)
*insertTLI = XLogCtl->ThisTimeLineID;
return LogwrtResult.Flush;
}
/*
* GetWALInsertionTimeLine -- Returns the current timeline of a system that
* is not in recovery.
*/
TimeLineID
GetWALInsertionTimeLine(void)
{
Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
/* Since the value can't be changing, no lock is required. */
return XLogCtl->ThisTimeLineID;
}
/*
* GetLastImportantRecPtr -- Returns the LSN of the last important record
* inserted. All records not explicitly marked as unimportant are considered
@ -10849,11 +10880,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
* 'segno' is for error reporting purposes.
*/
void
issue_xlog_fsync(int fd, XLogSegNo segno)
issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli)
{
char *msg = NULL;
instr_time start;
Assert(tli != 0);
/*
* Quick exit if fsync is disabled or write() has already synced the WAL
* file.
@ -10902,8 +10935,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno)
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
XLogFileName(xlogfname, ThisTimeLineID, segno,
wal_segment_size);
XLogFileName(xlogfname, tli, segno, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),

View File

@ -498,11 +498,13 @@ XLogArchiveNotify(const char *xlog)
* Convenience routine to notify using segment number representation of filename
*/
void
XLogArchiveNotifySeg(XLogSegNo segno)
XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli)
{
char xlog[MAXFNAMELEN];
XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size);
Assert(tli != 0);
XLogFileName(xlog, tli, segno, wal_segment_size);
XLogArchiveNotify(xlog);
}

View File

@ -382,7 +382,7 @@ pg_current_wal_flush_lsn(PG_FUNCTION_ARGS)
errmsg("recovery is in progress"),
errhint("WAL control functions cannot be executed during recovery.")));
current_recptr = GetFlushRecPtr();
current_recptr = GetFlushRecPtr(NULL);
PG_RETURN_LSN(current_recptr);
}
@ -469,7 +469,8 @@ pg_walfile_name_offset(PG_FUNCTION_ARGS)
* xlogfilename
*/
XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size);
XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size);
XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno,
wal_segment_size);
values[0] = CStringGetTextDatum(xlogfilename);
isnull[0] = false;
@ -511,7 +512,8 @@ pg_walfile_name(PG_FUNCTION_ARGS)
"pg_walfile_name()")));
XLByteToPrevSeg(locationpoint, xlogsegno, wal_segment_size);
XLogFileName(xlogfilename, ThisTimeLineID, xlogsegno, wal_segment_size);
XLogFileName(xlogfilename, GetWALInsertionTimeLine(), xlogsegno,
wal_segment_size);
PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
}

View File

@ -678,6 +678,10 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
* wantLength to the amount of the page that will be read, up to
* XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
*
* The currTLI argument should be the system-wide current timeline.
* Note that this may be different from state->currTLI, which is the timeline
* from which the caller is currently reading previous xlog records.
*
* We switch to an xlog segment from the new timeline eagerly when on a
* historical timeline, as soon as we reach the start of the xlog segment
* containing the timeline switch. The server copied the segment to the new
@ -699,12 +703,11 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
*
* The caller must also make sure it doesn't read past the current replay
* position (using GetXLogReplayRecPtr) if executing in recovery, so it
* doesn't fail to notice that the current timeline became historical. The
* caller must also update ThisTimeLineID with the result of
* GetXLogReplayRecPtr and must check RecoveryInProgress().
* doesn't fail to notice that the current timeline became historical.
*/
void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage,
uint32 wantLength, TimeLineID currTLI)
{
const XLogRecPtr lastReadPage = (state->seg.ws_segno *
state->segcxt.ws_segsize + state->segoff);
@ -712,6 +715,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
Assert(currTLI != 0);
/*
* If the desired page is currently read in and valid, we have nothing to
@ -732,12 +736,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* just carry on. (Seeking backwards requires a check to make sure the
* older page isn't on a prior timeline).
*
* ThisTimeLineID might've become historical since we last looked, but the
* caller is required not to read past the flush limit it saw at the time
* it looked up the timeline. There's nothing we can do about it if
* StartupXLOG() renames it to .partial concurrently.
* currTLI might've become historical since the caller obtained the value,
* but the caller is required not to read past the flush limit it saw at
* the time it looked up the timeline. There's nothing we can do about it
* if StartupXLOG() renames it to .partial concurrently.
*/
if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
if (state->currTLI == currTLI && wantPage >= lastReadPage)
{
Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
return;
@ -749,7 +753,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* the current segment we can just keep reading.
*/
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
state->currTLI != ThisTimeLineID &&
state->currTLI != currTLI &&
state->currTLI != 0 &&
((wantPage + wantLength) / state->segcxt.ws_segsize) <
(state->currTLIValidUntil / state->segcxt.ws_segsize))
@ -772,7 +776,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* We need to re-read the timeline history in case it's been changed
* by a promotion or replay from a cascaded replica.
*/
List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
List *timelineHistory = readTimeLineHistory(currTLI);
XLogRecPtr endOfSegment;
endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
@ -853,6 +857,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
TimeLineID tli;
int count;
WALReadError errinfo;
TimeLineID currTLI;
loc = targetPagePtr + reqLen;
@ -862,16 +867,12 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
/*
* Determine the limit of xlog we can currently read to, and what the
* most recent timeline is.
*
* RecoveryInProgress() will update ThisTimeLineID when it first
* notices recovery finishes, so we only have to maintain it for the
* local process until recovery ends.
*/
if (!RecoveryInProgress())
read_upto = GetFlushRecPtr();
read_upto = GetFlushRecPtr(&currTLI);
else
read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
tli = ThisTimeLineID;
read_upto = GetXLogReplayRecPtr(&currTLI);
tli = currTLI;
/*
* Check which timeline to get the record from.
@ -890,16 +891,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* archive in the timeline will get renamed to .partial by
* StartupXLOG().
*
* If that happens after our caller updated ThisTimeLineID but before
* If that happens after our caller determined the TLI but before
* we actually read the xlog page, we might still try to read from the
* old (now renamed) segment and fail. There's not much we can do
* about this, but it can only happen when we're a leaf of a cascading
* standby whose primary gets promoted while we're decoding, so a
* one-off ERROR isn't too bad.
*/
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli);
if (state->currTLI == ThisTimeLineID)
if (state->currTLI == currTLI)
{
if (loc <= read_upto)

View File

@ -211,7 +211,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* Compute the current end-of-wal.
*/
if (!RecoveryInProgress())
end_of_wal = GetFlushRecPtr();
end_of_wal = GetFlushRecPtr(NULL);
else
end_of_wal = GetXLogReplayRecPtr(NULL);

View File

@ -2435,7 +2435,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
bool *have_pending_txes)
{
dlist_mutable_iter iter;
XLogRecPtr local_flush = GetFlushRecPtr();
XLogRecPtr local_flush = GetFlushRecPtr(NULL);
*write = InvalidXLogRecPtr;
*flush = InvalidXLogRecPtr;

View File

@ -625,7 +625,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
* target position accordingly.
*/
if (!RecoveryInProgress())
moveto = Min(moveto, GetFlushRecPtr());
moveto = Min(moveto, GetFlushRecPtr(NULL));
else
moveto = Min(moveto, GetXLogReplayRecPtr(NULL));

View File

@ -122,10 +122,12 @@ static StringInfoData incoming_message;
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvClose(XLogRecPtr recptr);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
TimeLineID tli);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
TimeLineID tli);
static void XLogWalRcvFlush(bool dying, TimeLineID tli);
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@ -255,7 +257,7 @@ WalReceiverMain(void)
pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@ -394,7 +396,6 @@ WalReceiverMain(void)
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
options.proto.physical.startpointTLI = startpointTLI;
ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
@ -462,7 +463,8 @@ WalReceiverMain(void)
*/
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
startpointTLI);
}
else if (len == 0)
break;
@ -487,7 +489,7 @@ WalReceiverMain(void)
* let the startup process and primary server know about
* them.
*/
XLogWalRcvFlush(false);
XLogWalRcvFlush(false, startpointTLI);
}
/* Check if we need to exit the streaming loop. */
@ -608,7 +610,7 @@ WalReceiverMain(void)
{
char xlogfname[MAXFNAMELEN];
XLogWalRcvFlush(false);
XLogWalRcvFlush(false, startpointTLI);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (close(recvFile) != 0)
ereport(PANIC,
@ -776,9 +778,12 @@ static void
WalRcvDie(int code, Datum arg)
{
WalRcvData *walrcv = WalRcv;
TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
Assert(*startpointTLI_p != 0);
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
XLogWalRcvFlush(true, *startpointTLI_p);
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg)
* Accept the message from XLOG stream, and process it.
*/
static void
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
{
int hdrlen;
XLogRecPtr dataStart;
@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
buf += hdrlen;
len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
case 'k': /* Keepalive */
@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
* Write XLOG data to disk.
*/
static void
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
{
int startoff;
int byteswritten;
Assert(tli != 0);
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
XLogWalRcvClose(recptr);
XLogWalRcvClose(recptr, tli);
if (recvFile < 0)
{
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
recvFile = XLogFileInit(recvSegNo);
recvFileTLI = ThisTimeLineID;
recvFile = XLogFileInit(recvSegNo, tli);
recvFileTLI = tli;
}
/* Calculate the start offset of the received logs */
@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
* segment is received and written.
*/
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
XLogWalRcvClose(recptr);
XLogWalRcvClose(recptr, tli);
}
/*
@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
* an error, so we skip sending a reply in that case.
*/
static void
XLogWalRcvFlush(bool dying)
XLogWalRcvFlush(bool dying, TimeLineID tli)
{
Assert(tli != 0);
if (LogstreamResult.Flush < LogstreamResult.Write)
{
WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
issue_xlog_fsync(recvFile, recvSegNo, tli);
LogstreamResult.Flush = LogstreamResult.Write;
@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying)
{
walrcv->latestChunkStart = walrcv->flushedUpto;
walrcv->flushedUpto = LogstreamResult.Flush;
walrcv->receivedTLI = ThisTimeLineID;
walrcv->receivedTLI = tli;
}
SpinLockRelease(&walrcv->mutex);
@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying)
* Create an archive notification file since the segment is known completed.
*/
static void
XLogWalRcvClose(XLogRecPtr recptr)
XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
{
char xlogfname[MAXFNAMELEN];
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
Assert(tli != 0);
/*
* fsync() and close current file before we switch to next one. We would
* otherwise have to reopen this file to fsync it later
*/
XLogWalRcvFlush(false);
XLogWalRcvFlush(false, tli);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);

View File

@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
static void IdentifySystem(void);
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
@ -385,6 +385,7 @@ IdentifySystem(void)
TupleDesc tupdesc;
Datum values[4];
bool nulls[4];
TimeLineID currTLI;
/*
* Reply with a result set with one row, four columns. First col is system
@ -397,12 +398,9 @@ IdentifySystem(void)
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
logptr = GetStandbyFlushRecPtr();
}
logptr = GetStandbyFlushRecPtr(&currTLI);
else
logptr = GetFlushRecPtr();
logptr = GetFlushRecPtr(&currTLI);
snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
@ -441,7 +439,7 @@ IdentifySystem(void)
values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
values[1] = Int32GetDatum(ThisTimeLineID);
values[1] = Int32GetDatum(currTLI);
/* column 3: wal location */
values[2] = CStringGetTextDatum(xloc);
@ -537,7 +535,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
if (RecoveryInProgress())
(void) GetXLogReplayRecPtr(&current_timeline);
else
current_timeline = ThisTimeLineID;
current_timeline = GetWALInsertionTimeLine();
timeline_history = readTimeLineHistory(current_timeline);
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
XLogRecPtr FlushPtr;
TimeLineID FlushTLI;
/* create xlogreader for physical replication */
xlogreader =
@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd)
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
* that. Otherwise use the timeline of the last replayed record.
*/
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr();
}
FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
else
FlushPtr = GetFlushRecPtr();
FlushPtr = GetFlushRecPtr(&FlushTLI);
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
if (sendTimeLine == FlushTLI)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd)
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
timeLineHistory = readTimeLineHistory(FlushTLI);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd)
}
else
{
sendTimeLine = ThisTimeLineID;
sendTimeLine = FlushTLI;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
TimeLineID currTLI = GetWALInsertionTimeLine();
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
/*
* Since logical decoding is only permitted on a primary server, we know
* that the current timeline ID can't be changing any more. If we did this
* on a standby, we'd have to worry about the values we compute here
* becoming invalid due to a promotion or timeline change.
*/
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
@ -1487,7 +1489,7 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Get a more recent flush pointer. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr();
RecentFlushPtr = GetFlushRecPtr(NULL);
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
@ -1521,7 +1523,7 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr();
RecentFlushPtr = GetFlushRecPtr(NULL);
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
@ -2683,6 +2685,8 @@ XLogSendPhysical(void)
}
else if (am_cascading_walsender)
{
TimeLineID SendRqstTLI;
/*
* Streaming the latest timeline on a standby.
*
@ -2702,14 +2706,12 @@ XLogSendPhysical(void)
*/
bool becameHistoric = false;
SendRqstPtr = GetStandbyFlushRecPtr();
SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
if (!RecoveryInProgress())
{
/*
* We have been promoted. RecoveryInProgress() updated
* ThisTimeLineID to the new current timeline.
*/
/* We have been promoted. */
SendRqstTLI = GetWALInsertionTimeLine();
am_cascading_walsender = false;
becameHistoric = true;
}
@ -2717,10 +2719,9 @@ XLogSendPhysical(void)
{
/*
* Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? ThisTimeLineID was
* updated by the GetStandbyFlushRecPtr() call above.
* still the one recovery is recovering from?
*/
if (sendTimeLine != ThisTimeLineID)
if (sendTimeLine != SendRqstTLI)
becameHistoric = true;
}
@ -2733,7 +2734,7 @@ XLogSendPhysical(void)
*/
List *history;
history = readTimeLineHistory(ThisTimeLineID);
history = readTimeLineHistory(SendRqstTLI);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sendTimeLine < sendTimeLineNextTLI);
@ -2756,7 +2757,7 @@ XLogSendPhysical(void)
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
SendRqstPtr = GetFlushRecPtr();
SendRqstPtr = GetFlushRecPtr(NULL);
}
/*
@ -2997,9 +2998,9 @@ XLogSendLogical(void)
* we only need to update flushPtr if EndRecPtr is past it.
*/
if (flushPtr == InvalidXLogRecPtr)
flushPtr = GetFlushRecPtr();
flushPtr = GetFlushRecPtr(NULL);
else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
flushPtr = GetFlushRecPtr();
flushPtr = GetFlushRecPtr(NULL);
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data)
* can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby.
*
* As a side-effect, ThisTimeLineID is updated to the TLI of the last
* As a side-effect, *tli is updated to the TLI of the last
* replayed WAL record.
*/
static XLogRecPtr
GetStandbyFlushRecPtr(void)
GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
TimeLineID replayTLI;
@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
ThisTimeLineID = replayTLI;
*tli = replayTLI;
result = replayPtr;
if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
if (receiveTLI == replayTLI && receivePtr > replayPtr)
result = receivePtr;
return result;

View File

@ -29,8 +29,6 @@
#define SYNC_METHOD_OPEN_DSYNC 4 /* for O_DSYNC */
extern int sync_method;
extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */
/*
* Recovery target type.
* Only set during a Point in Time recovery, not when in standby mode.
@ -262,7 +260,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
extern int XLogFileInit(XLogSegNo segno);
extern int XLogFileInit(XLogSegNo segno, TimeLineID tli);
extern int XLogFileOpen(XLogSegNo segno);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
@ -274,7 +272,7 @@ extern void xlog_redo(XLogReaderState *record);
extern void xlog_desc(StringInfo buf, XLogReaderState *record);
extern const char *xlog_identify(uint8 info);
extern void issue_xlog_fsync(int fd, XLogSegNo segno);
extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli);
extern bool RecoveryInProgress(void);
extern RecoveryState GetRecoveryState(void);
@ -312,7 +310,8 @@ extern void UpdateFullPageWrites(void);
extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p);
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI);
extern TimeLineID GetWALInsertionTimeLine(void);
extern XLogRecPtr GetLastImportantRecPtr(void);
extern void RemovePromoteSignalFiles(void);

View File

@ -24,7 +24,7 @@ extern void ExecuteRecoveryCommand(const char *command, const char *commandName,
bool failOnSignal);
extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname);
extern void XLogArchiveNotify(const char *xlog);
extern void XLogArchiveNotifySeg(XLogSegNo segno);
extern void XLogArchiveNotifySeg(XLogSegNo segno, TimeLineID tli);
extern void XLogArchiveForceDone(const char *xlog);
extern bool XLogArchiveCheckDone(const char *xlog);
extern bool XLogArchiveIsBusy(const char *xlog);

View File

@ -98,7 +98,9 @@ extern void wal_segment_open(XLogReaderState *state,
extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
XLogRecPtr wantPage,
uint32 wantLength,
TimeLineID currTLI);
extern void WALReadRaiseError(WALReadError *errinfo);