From 0dc8ead46363fec6f621a12c7e1f889ba73b55a9 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 25 Nov 2019 15:04:54 -0300 Subject: [PATCH] Refactor WAL file-reading code into WALRead() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit XLogReader, walsender and pg_waldump all had their own routines to read data from WAL files to memory, with slightly different approaches according to the particular conditions of each environment. There's a lot of commonality, so we can refactor that into a single routine WALRead in XLogReader, and move the differences to a separate (simpler) callback that just opens the next WAL-segment. This results in a clearer (ahem) code flow. The error reporting needs are covered by filling in a new error-info struct, WALReadError, and it's the caller's responsibility to act on it. The backend has WALReadRaiseError() to do so. We no longer ever need to seek in this interface; switch to using pg_pread(). Author: Antonin Houska, with contributions from Álvaro Herrera Reviewed-by: Michaël Paquier, Kyotaro Horiguchi Discussion: https://postgr.es/m/14984.1554998742@spoje.net --- src/backend/access/transam/xlogreader.c | 106 ++++++++- src/backend/access/transam/xlogutils.c | 205 ++++++---------- src/backend/replication/walsender.c | 300 ++++++++++-------------- src/bin/pg_waldump/pg_waldump.c | 168 +++++-------- src/include/access/xlogreader.h | 39 ++- src/include/access/xlogutils.h | 2 + 6 files changed, 387 insertions(+), 433 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7f24f0cb95..67418b05f1 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include + #include "access/transam.h" #include "access/xlog_internal.h" #include "access/xlogreader.h" @@ -27,6 +29,7 @@ #ifndef FRONTEND #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, { seg->ws_file = -1; seg->ws_segno = 0; - seg->ws_off = 0; seg->ws_tli = 0; segcxt->ws_segsize = segsize; @@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, - targetPagePtr, + readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); if (readOff < 0) goto err; @@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* check whether we have all the requested data already */ if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->seg.ws_off && reqLen <= state->readLen) + targetPageOff == state->segoff && reqLen <= state->readLen) return state->readLen; /* @@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* update read state information */ state->seg.ws_segno = targetSegNo; - state->seg.ws_off = targetPageOff; + state->segoff = targetPageOff; state->readLen = readLen; return readLen; @@ -644,7 +645,7 @@ static void XLogReaderInvalReadState(XLogReaderState *state) { state->seg.ws_segno = 0; - state->seg.ws_off = 0; + state->segoff = 0; state->readLen = 0; } @@ -1015,6 +1016,99 @@ out: #endif /* FRONTEND */ +/* + * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL + * fetched from timeline 'tli'. + * + * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback + * to open the next segment, if necessary. + * + * Returns true if succeeded, false if an error occurs, in which case + * 'errinfo' receives error details. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +bool +WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, + WALOpenSegment *seg, WALSegmentContext *segcxt, + WALSegmentOpen openSegment, WALReadError *errinfo) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + + /* + * If the data we want is not in a segment we have open, close what we + * have (if anything) and open the next one, using the caller's + * provided openSegment callback. + */ + if (seg->ws_file < 0 || + !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || + tli != seg->ws_tli) + { + XLogSegNo nextSegNo; + + if (seg->ws_file >= 0) + close(seg->ws_file); + + XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); + seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + + /* Update the current segment info. */ + seg->ws_tli = tli; + seg->ws_segno = nextSegNo; + } + + /* How many bytes are within this segment? */ + if (nbytes > (segcxt->ws_segsize - startoff)) + segbytes = segcxt->ws_segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* Reset errno first; eases reporting non-errno-affecting errors */ + errno = 0; + readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = segbytes; + errinfo->wre_read = readbytes; + errinfo->wre_off = startoff; + errinfo->wre_seg = *seg; + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + } + + return true; +} + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 5f1e5ba75d..446760ed6e 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, forget_invalid_pages(rnode, forkNum, nblocks); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. @@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->seg.ws_segno * - state->segcxt.ws_segsize + state->seg.ws_off; + const XLogRecPtr lastReadPage = (state->seg.ws_segno * + state->segcxt.ws_segsize + state->segoff); Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -896,6 +774,34 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* openSegment callback for WALRead */ +static int +wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int fd; + + XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; + + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + + return -1; /* keep compiler quiet */ +} + /* * read_page callback for reading local xlog files * @@ -913,7 +819,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, { XLogRecPtr read_upto, loc; + TimeLineID tli; int count; + WALReadError errinfo; loc = targetPagePtr + reqLen; @@ -932,7 +840,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - state->seg.ws_tli = ThisTimeLineID; + tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -982,14 +890,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting ws_tli to our wanted record's TLI is slightly wrong; - * the page might begin on an older timeline if it contains a - * timeline switch, since its xlog segment will have been copied - * from the prior timeline. This is pretty harmless though, as - * nothing cares so long as the timeline doesn't go backwards. We - * should read the page header instead; FIXME someday. + * Setting tli to our wanted record's TLI is slightly wrong; the + * page might begin on an older timeline if it contains a timeline + * switch, since its xlog segment will have been copied from the + * prior timeline. This is pretty harmless though, as nothing + * cares so long as the timeline doesn't go backwards. We should + * read the page header instead; FIXME someday. */ - state->seg.ws_tli = state->currTLI; + tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1020,9 +928,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, - XLOG_BLCKSZ); + if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, + &state->segcxt, wal_segment_open, &errinfo)) + WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ return count; } + +/* + * Backend-specific convenience code to handle read errors encountered by + * WALRead(). + */ +void +WALReadRaiseError(WALReadError *errinfo) +{ + WALOpenSegment *seg = &errinfo->wre_seg; + char *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno); + + if (errinfo->wre_read < 0) + { + errno = errinfo->wre_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u: %m", + fname, errinfo->wre_off))); + } + else if (errinfo->wre_read == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: read %d of %zu", + fname, errinfo->wre_off, errinfo->wre_read, + (Size) errinfo->wre_req))); + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cbc928501a..ac9209747a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); +static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); -static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req { XLogRecPtr flushptr; int count; + WALReadError errinfo; + XLogSegNo segno; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); @@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); + if (!WALRead(cur_page, + targetPagePtr, + XLOG_BLCKSZ, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); return count; } @@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - */ -static void -XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) +/* walsender's openSegment callback for WALRead */ +static int +WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) { - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; + char path[MAXPGPATH]; + int fd; -retry: - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) { - uint32 startoff; - int segbytes; - int readbytes; + XLogSegNo endSegNo; - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - - if (sendSeg->ws_file < 0 || - !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - - XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - sendSeg->ws_tli = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) - sendSeg->ws_tli = sendTimeLineNextTLI; - } - - XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - - sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendSeg->ws_file < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendSeg->ws_off = 0; - } - - /* Need to seek in the file? */ - if (sendSeg->ws_off != startoff) - { - if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - startoff))); - sendSeg->ws_off = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendSeg->ws_file, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, (Size) segbytes))); - } - else if (readbytes == 0) - { - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, readbytes, (Size) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendSeg->ws_off += readbytes; - nbytes -= readbytes; - p += readbytes; + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + *tli_p = sendTimeLineNextTLI; } - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, segcxt->ws_segsize); - CheckXLogRemoved(segno, ThisTimeLineID); + XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. + * If the file is not found, assume it's because the standby asked for a + * too old WAL segment that has already been removed or recycled. */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - - goto retry; - } - } + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(*tli_p, nextSegNo)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + return -1; /* keep compiler quiet */ } /* @@ -2562,6 +2464,8 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; + WALReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2777,7 +2681,49 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); + +retry: + if (!WALRead(&output_message.data[output_message.len], + startptr, + nbytes, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->ws_file >= 0) + { + close(sendSeg->ws_file); + sendSeg->ws_file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index d6695f7196..30a5851d87 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,137 +280,57 @@ identify_target_directory(char *directory, char *fname) return NULL; /* not reached */ } -/* - * Read count bytes from a segment file in the specified directory, for the - * given timeline, containing the specified record pointer; store the data in - * the passed buffer. - */ -static void -XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, - XLogRecPtr startptr, char *buf, Size count) +/* pg_waldump's openSegment callback for WALRead */ +static int +WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) { - char *p; - XLogRecPtr recptr; - Size nbytes; + TimeLineID tli = *tli_p; + char fname[MAXPGPATH]; + int fd; + int tries; - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; + XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is available. + * So we loop for 5 seconds looking for the file to appear before giving + * up. + */ + for (tries = 0; tries < 10; tries++) { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, WalSegSz); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz)) + fd = open_file_in_directory(segcxt->ws_dir, fname); + if (fd >= 0) + return fd; + if (errno == ENOENT) { - char fname[MAXFNAMELEN]; - int tries; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, WalSegSz); - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - /* - * In follow mode there is a short period of time after the server - * has written the end of the previous file before the new file is - * available. So we loop for 5 seconds looking for the file to - * appear before giving up. - */ - for (tries = 0; tries < 10; tries++) - { - sendFile = open_file_in_directory(directory, fname); - if (sendFile >= 0) - break; - if (errno == ENOENT) - { - int save_errno = errno; - - /* File not there yet, try again */ - pg_usleep(500 * 1000); - - errno = save_errno; - continue; - } - /* Any other error, fall through and fail */ - break; - } - - if (sendFile < 0) - fatal_error("could not find file \"%s\": %s", - fname, strerror(errno)); - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - int err = errno; - char fname[MAXPGPATH]; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - fatal_error("could not seek in log file %s to offset %u: %s", - fname, startoff, strerror(err)); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (WalSegSz - startoff)) - segbytes = WalSegSz - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) - { - int err = errno; - char fname[MAXPGPATH]; int save_errno = errno; - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); + /* File not there yet, try again */ + pg_usleep(500 * 1000); + errno = save_errno; - - if (readbytes < 0) - fatal_error("could not read from log file %s, offset %u, length %d: %s", - fname, sendOff, segbytes, strerror(err)); - else if (readbytes == 0) - fatal_error("could not read from log file %s, offset %u: read %d of %zu", - fname, sendOff, readbytes, (Size) segbytes); + continue; } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; + /* Any other error, fall through and fail */ + break; } + + fatal_error("could not find file \"%s\": %s", fname, strerror(errno)); + return -1; /* keep compiler quiet */ } /* * XLogReader read_page callback */ static int -XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff) +WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; + WALReadError errinfo; if (private->endptr != InvalidXLogRecPtr) { @@ -425,8 +345,26 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr, - readBuff, count); + if (!WALRead(readBuff, targetPagePtr, count, private->timeline, + &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) + { + WALOpenSegment *seg = &errinfo.wre_seg; + char fname[MAXPGPATH]; + + XLogFileName(fname, seg->ws_tli, seg->ws_segno, + state->segcxt.ws_segsize); + + if (errinfo.wre_errno != 0) + { + errno = errinfo.wre_errno; + fatal_error("could not read from file %s, offset %u: %m", + fname, errinfo.wre_off); + } + else + fatal_error("could not read from file %s, offset %u: read %d of %zu", + fname, errinfo.wre_off, errinfo.wre_read, + (Size) errinfo.wre_req); + } return count; } @@ -1089,7 +1027,7 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage, + xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, &private); if (!xlogreader_state) fatal_error("out of memory"); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 1bbee386e8..0193611b7f 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -36,7 +36,6 @@ typedef struct WALOpenSegment { int ws_file; /* segment file descriptor */ XLogSegNo ws_segno; /* segment number */ - uint32 ws_off; /* offset in the segment */ TimeLineID ws_tli; /* timeline ID of the currently open file */ } WALOpenSegment; @@ -168,6 +167,7 @@ struct XLogReaderState /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; + uint32 segoff; /* * beginning of prior page read, and its TLI. Doesn't necessarily @@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* + * Callback to open the specified WAL segment for reading. Returns a valid + * file descriptor when the file was opened successfully. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can use + * it to return the TLI that it actually opened. + * + * BasicOpenFile() is the preferred way to open the segment file in backend + * code, whereas open(2) should be used in frontend. + */ +typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p); + /* Initialize supporting structures */ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -232,6 +250,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ + +/* + * Error information from WALRead that both backend and frontend caller can + * process. Currently only errors from pg_pread can be reported. + */ +typedef struct WALReadError +{ + int wre_errno; /* errno set by the last pg_pread() */ + int wre_off; /* Offset we tried to read from. */ + int wre_req; /* Bytes requested to be read. */ + int wre_read; /* Bytes read by the last read(). */ + WALOpenSegment wre_seg; /* Segment we tried to read from. */ +} WALReadError; + +extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID tli, WALOpenSegment *seg, + WALSegmentContext *segcxt, WALSegmentOpen openSegment, + WALReadError *errinfo); + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 2df98e45b2..0572b24192 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -54,4 +54,6 @@ extern int read_local_xlog_page(XLogReaderState *state, extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); +extern void WALReadRaiseError(WALReadError *errinfo); + #endif