diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 477709bbc2..546bd43ce8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) * * Note clearly that this function can access WAL during normal operation, * similarly to the way WALSender or Logical Decoding would do. - * */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page, - NULL); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + &read_local_xlog_page, NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 501f46fd52..6c69eb6dd7 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *readTLI); + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, + NULL, NULL); if (!debug_reader) { @@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size); offset = XLogSegmentOffset(xlogreader->latestPagePtr, wal_segment_size); - XLogFileName(fname, xlogreader->readPageTLI, segno, + XLogFileName(fname, xlogreader->seg.ws_tli, segno, wal_segment_size); ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), @@ -6353,7 +6353,8 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + &XLogPageRead, &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7355,7 +7356,7 @@ StartupXLOG(void) * and we were reading the old WAL from a segment belonging to a higher * timeline. */ - EndOfLogTLI = xlogreader->readPageTLI; + EndOfLogTLI = xlogreader->seg.ws_tli; /* * Complain if we did not roll forward far enough to render the backup @@ -11523,7 +11524,7 @@ CancelBackup(void) */ static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) + XLogRecPtr targetRecPtr, char *readBuf) { XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; @@ -11640,7 +11641,7 @@ retry: Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - *readTLI = curFileTLI; + xlogreader->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a66e3324b1..27c27303d6 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) * Returns NULL if the xlogreader couldn't be allocated. */ XLogReaderState * -XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, - void *private_data) +XLogReaderAllocate(int wal_segment_size, const char *waldir, + XLogPageReadCB pagereadfunc, void *private_data) { XLogReaderState *state; @@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, return NULL; } - state->wal_segment_size = wal_segment_size; + /* Initialize segment info. */ + WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, + waldir); + state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; @@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) return true; } +/* + * Initialize the passed segment structs. + */ +void +WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, + int segsize, const char *waldir) +{ + seg->ws_file = -1; + seg->ws_segno = 0; + seg->ws_off = 0; + seg->ws_tli = 0; + + segcxt->ws_segsize = segsize; + if (waldir) + snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir); +} + /* * Attempt to read an XLOG record. * @@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->wal_segment_size - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); + state->EndRecPtr += state->segcxt.ws_segsize - 1; + state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } if (DecodeXLogRecord(state, record, errormsg)) @@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) Assert((pageptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); - targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); /* check whether we have all the requested data already */ - if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && - reqLen <= state->readLen) + if (targetSegNo == state->seg.ws_segno && + targetPageOff == state->seg.ws_off && reqLen <= state->readLen) return state->readLen; /* @@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->readSegNo && targetPageOff != 0) + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) */ readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; } @@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) goto err; /* update read state information */ - state->readSegNo = targetSegNo; - state->readOff = targetPageOff; + state->seg.ws_segno = targetSegNo; + state->seg.ws_off = targetPageOff; state->readLen = readLen; return readLen; @@ -625,8 +645,8 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->readSegNo = 0; - state->readOff = 0; + state->seg.ws_segno = 0; + state->seg.ws_off = 0; state->readLen = 0; } @@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, Assert((recptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(recptr, segno, state->wal_segment_size); - offset = XLogSegmentOffset(recptr, state->wal_segment_size); + XLByteToSeg(recptr, segno, state->segcxt.ws_segsize); + offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); - XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr); + XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr); if (hdr->xlp_magic != XLOG_PAGE_MAGIC) { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "invalid magic number %04X in log segment %s, offset %u", @@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "invalid info bits %04X in log segment %s, offset %u", @@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, (unsigned long long) state->system_identifier); return false; } - else if (longhdr->xlp_seg_size != state->wal_segment_size) + else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) { report_invalid_record(state, "WAL file is from different database system: incorrect segment size in page header"); @@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, @@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "unexpected pageaddr %X/%X in log segment %s, offset %u", @@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", @@ -997,7 +1017,6 @@ out: #endif /* FRONTEND */ - /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 1fc39333f1..5f1e5ba75d 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readOff; + const XLogRecPtr lastReadPage = state->seg.ws_segno * + state->segcxt.ws_segsize + state->seg.ws_off; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa if (state->currTLIValidUntil != InvalidXLogRecPtr && state->currTLI != ThisTimeLineID && state->currTLI != 0 && - ((wantPage + wantLength) / state->wal_segment_size) < - (state->currTLIValidUntil / state->wal_segment_size)) + ((wantPage + wantLength) / state->segcxt.ws_segsize) < + (state->currTLIValidUntil / state->segcxt.ws_segsize)) return; /* @@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * by a promotion or replay from a cascaded replica. */ List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + XLogRecPtr endOfSegment; - XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1) - * state->wal_segment_size) - 1; - - Assert(wantPage / state->wal_segment_size == - endOfSegment / state->wal_segment_size); + endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) * + state->segcxt.ws_segsize - 1; + Assert(wantPage / state->segcxt.ws_segsize == + endOfSegment / state->segcxt.ws_segsize); /* * Find the timeline of the last LSN on the segment containing @@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa */ int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr read_upto, loc; @@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - - *pageTLI = ThisTimeLineID; + state->seg.ws_tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting pageTLI to our wanted record's TLI is slightly wrong; + * Setting ws_tli to our wanted record's TLI is slightly wrong; * the page might begin on an older timeline if it contains a * timeline switch, since its xlog segment will have been copied * from the prior timeline. This is pretty harmless though, as * nothing cares so long as the timeline doesn't go backwards. We * should read the page header instead; FIXME someday. */ - *pageTLI = state->currTLI; + state->seg.ws_tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr, + XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, XLOG_BLCKSZ); /* number of valid bytes in the buffer */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f8b9020081..da265f5294 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d974400d6e..d1cf80d441 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -116,10 +116,10 @@ check_permissions(void) int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + targetRecPtr, cur_page); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 23870a25a5..eb4a98cc91 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -128,16 +128,8 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -/* - * These variables are used similarly to openLogFile/SegNo/Off, - * but for walsender to read the XLOG. - */ -static int sendFile = -1; -static XLogSegNo sendSegNo = 0; -static uint32 sendOff = 0; - -/* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static WALOpenSegment *sendSeg = NULL; +static WALSegmentContext *sendCxt = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); /* Initialize walsender process before entering the main command loop */ @@ -285,6 +277,13 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); + + /* Make sure we can remember the current read position in XLOG. */ + sendSeg = (WALOpenSegment *) + MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); + sendCxt = (WALSegmentContext *) + MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); + WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); } /* @@ -301,10 +300,10 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendFile >= 0) + if (sendSeg->ws_file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->ws_file); + sendSeg->ws_file = -1; } if (MyReplicationSlot != NULL) @@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd) */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr flushptr; int count; @@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); return count; } @@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2382,17 +2381,18 @@ retry: int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, wal_segment_size); + startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size)) + if (sendSeg->ws_file < 0 || + !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) { char path[MAXPGPATH]; /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); + if (sendSeg->ws_file >= 0) + close(sendSeg->ws_file); - XLByteToSeg(recptr, sendSegNo, wal_segment_size); + XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); /*------- * When reading from a historic timeline, and there is a timeline @@ -2420,20 +2420,20 @@ retry: * used portion of the old segment is copied to the new file. *------- */ - curFileTimeLine = sendTimeLine; + sendSeg->ws_tli = sendTimeLine; if (sendTimeLineIsHistoric) { XLogSegNo endSegNo; - XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSegNo == endSegNo) - curFileTimeLine = sendTimeLineNextTLI; + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + sendSeg->ws_tli = sendTimeLineNextTLI; } - XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size); + XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendFile < 0) + sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (sendSeg->ws_file < 0) { /* * If the file is not found, assume it's because the standby @@ -2444,58 +2444,58 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); else ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); } - sendOff = 0; + sendSeg->ws_off = 0; } /* Need to seek in the file? */ - if (sendOff != startoff) + if (sendSeg->ws_off != startoff) { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), startoff))); - sendOff = startoff; + sendSeg->ws_off = startoff; } /* How many bytes are within this segment? */ - if (nbytes > (wal_segment_size - startoff)) - segbytes = wal_segment_size - startoff; + if (nbytes > (segcxt->ws_segsize - startoff)) + segbytes = segcxt->ws_segsize - startoff; else segbytes = nbytes; pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); + readbytes = read(sendSeg->ws_file, p, segbytes); pgstat_report_wait_end(); if (readbytes < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (Size) segbytes))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), + sendSeg->ws_off, (Size) segbytes))); } else if (readbytes == 0) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, readbytes, (Size) segbytes))); + XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), + sendSeg->ws_off, readbytes, (Size) segbytes))); } /* Update state for read */ recptr += readbytes; - sendOff += readbytes; + sendSeg->ws_off += readbytes; nbytes -= readbytes; p += readbytes; } @@ -2507,7 +2507,7 @@ retry: * read() succeeds in that case, but the data we tried to read might * already have been overwritten with new WAL records. */ - XLByteToSeg(startptr, segno, wal_segment_size); + XLByteToSeg(startptr, segno, segcxt->ws_segsize); CheckXLogRemoved(segno, ThisTimeLineID); /* @@ -2526,10 +2526,10 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendFile >= 0) + if (reload && sendSeg->ws_file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->ws_file); + sendSeg->ws_file = -1; goto retry; } @@ -2695,9 +2695,9 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendFile >= 0) - close(sendFile); - sendFile = -1; + if (sendSeg->ws_file >= 0) + close(sendSeg->ws_file); + sendSeg->ws_file = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); @@ -2768,7 +2768,7 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 63c3879ead..264a8f4db5 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -43,14 +43,12 @@ static char xlogfpath[MAXPGPATH]; typedef struct XLogPageReadPrivate { - const char *datadir; int tliIndex; } XLogPageReadPrivate; static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI); + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); /* * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline @@ -66,9 +64,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, char *errormsg; XLogPageReadPrivate private; - private.datadir = datadir; private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -119,9 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) XLogPageReadPrivate private; XLogRecPtr endptr; - private.datadir = datadir; private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -177,9 +173,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, forkptr += SizeOfXLogShortPHD; } - private.datadir = datadir; private.tliIndex = tliIndex; - xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -237,8 +232,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, /* XLogReader callback function, to read a WAL page */ static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; uint32 targetPageOff; @@ -283,7 +277,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogFileName(xlogfname, targetHistory[private->tliIndex].tli, xlogreadsegno, WalSegSz); - snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname); + snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", + xlogreader->segcxt.ws_dir, xlogfname); xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0); @@ -321,7 +316,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - *pageTLI = targetHistory[private->tliIndex].tli; + xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; return XLOG_BLCKSZ; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index b95d467805..b79208cd73 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -33,7 +33,6 @@ static int WalSegSz; typedef struct XLogDumpPrivate { TimeLineID timeline; - char *inpath; XLogRecPtr startptr; XLogRecPtr endptr; bool endptr_reached; @@ -224,7 +223,7 @@ search_directory(const char *directory, const char *fname) } /* - * Identify the target directory and set WalSegSz. + * Identify the target directory. * * Try to find the file in several places: * if directory != NULL: @@ -235,29 +234,22 @@ search_directory(const char *directory, const char *fname) * XLOGDIR / * $PGDATA / XLOGDIR / * - * Set the valid target directory in private->inpath. + * The valid target directory is returned. */ -static void -identify_target_directory(XLogDumpPrivate *private, char *directory, - char *fname) +static char * +identify_target_directory(char *directory, char *fname) { char fpath[MAXPGPATH]; if (directory != NULL) { if (search_directory(directory, fname)) - { - private->inpath = pg_strdup(directory); - return; - } + return pg_strdup(directory); /* directory / XLOGDIR */ snprintf(fpath, MAXPGPATH, "%s/%s", directory, XLOGDIR); if (search_directory(fpath, fname)) - { - private->inpath = pg_strdup(fpath); - return; - } + return pg_strdup(fpath); } else { @@ -265,16 +257,10 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, /* current directory */ if (search_directory(".", fname)) - { - private->inpath = pg_strdup("."); - return; - } + return pg_strdup("."); /* XLOGDIR */ if (search_directory(XLOGDIR, fname)) - { - private->inpath = pg_strdup(XLOGDIR); - return; - } + return pg_strdup(XLOGDIR); datadir = getenv("PGDATA"); /* $PGDATA / XLOGDIR */ @@ -282,10 +268,7 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, { snprintf(fpath, MAXPGPATH, "%s/%s", datadir, XLOGDIR); if (search_directory(fpath, fname)) - { - private->inpath = pg_strdup(fpath); - return; - } + return pg_strdup(fpath); } } @@ -294,6 +277,8 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, fatal_error("could not locate WAL file \"%s\"", fname); else fatal_error("could not find any WAL file"); + + return NULL; /* not reached */ } /* @@ -423,7 +408,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, */ static int XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI) + XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; @@ -441,7 +426,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, + XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr, readBuff, count); return count; @@ -820,6 +805,7 @@ main(int argc, char **argv) XLogDumpStats stats; XLogRecord *record; XLogRecPtr first_record; + char *waldir = NULL; char *errormsg; static struct option long_options[] = { @@ -912,7 +898,7 @@ main(int argc, char **argv) } break; case 'p': - private.inpath = pg_strdup(optarg); + waldir = pg_strdup(optarg); break; case 'r': { @@ -994,13 +980,13 @@ main(int argc, char **argv) goto bad_argument; } - if (private.inpath != NULL) + if (waldir != NULL) { /* validate path points to directory */ - if (!verify_directory(private.inpath)) + if (!verify_directory(waldir)) { pg_log_error("path \"%s\" could not be opened: %s", - private.inpath, strerror(errno)); + waldir, strerror(errno)); goto bad_argument; } } @@ -1015,17 +1001,17 @@ main(int argc, char **argv) split_path(argv[optind], &directory, &fname); - if (private.inpath == NULL && directory != NULL) + if (waldir == NULL && directory != NULL) { - private.inpath = directory; + waldir = directory; - if (!verify_directory(private.inpath)) + if (!verify_directory(waldir)) fatal_error("could not open directory \"%s\": %s", - private.inpath, strerror(errno)); + waldir, strerror(errno)); } - identify_target_directory(&private, private.inpath, fname); - fd = open_file_in_directory(private.inpath, fname); + waldir = identify_target_directory(waldir, fname); + fd = open_file_in_directory(waldir, fname); if (fd < 0) fatal_error("could not open file \"%s\"", fname); close(fd); @@ -1056,7 +1042,7 @@ main(int argc, char **argv) /* ignore directory, already have that */ split_path(argv[optind + 1], &directory, &fname); - fd = open_file_in_directory(private.inpath, fname); + fd = open_file_in_directory(waldir, fname); if (fd < 0) fatal_error("could not open file \"%s\"", fname); close(fd); @@ -1088,7 +1074,7 @@ main(int argc, char **argv) } } else - identify_target_directory(&private, private.inpath, NULL); + waldir = identify_target_directory(waldir, NULL); /* we don't know what to print */ if (XLogRecPtrIsInvalid(private.startptr)) @@ -1100,7 +1086,7 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage, + xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage, &private); if (!xlogreader_state) fatal_error("out of memory"); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 735b1bd2fd..1bbee386e8 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -31,6 +31,22 @@ #include "access/xlogrecord.h" +/* WALOpenSegment represents a WAL segment being read. */ +typedef struct WALOpenSegment +{ + int ws_file; /* segment file descriptor */ + XLogSegNo ws_segno; /* segment number */ + uint32 ws_off; /* offset in the segment */ + TimeLineID ws_tli; /* timeline ID of the currently open file */ +} WALOpenSegment; + +/* WALSegmentContext carries context information about WAL segments to read */ +typedef struct WALSegmentContext +{ + char ws_dir[MAXPGPATH]; + int ws_segsize; +} WALSegmentContext; + typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -38,8 +54,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, - char *readBuf, - TimeLineID *pageTLI); + char *readBuf); typedef struct { @@ -77,11 +92,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Segment size of the to-be-parsed data (mandatory). - */ - int wal_segment_size; - /* * Data input callback (mandatory). * @@ -99,9 +109,8 @@ struct XLogReaderState * actual WAL record it's interested in. In that case, targetRecPtr can * be used to determine which timeline to read the page from. * - * The callback shall set *pageTLI to the TLI of the file the page was - * read from. It is currently used only for error reporting purposes, to - * reconstruct the name of the WAL file where an error occurred. + * The callback shall set ->seg.ws_tli to the TLI of the file the page was + * read from. */ XLogPageReadCB read_page; @@ -156,10 +165,9 @@ struct XLogReaderState char *readBuf; uint32 readLen; - /* last read segment, segment offset, TLI for data currently in readBuf */ - XLogSegNo readSegNo; - uint32 readOff; - TimeLineID readPageTLI; + /* last read XLOG position for data currently in readBuf */ + WALSegmentContext segcxt; + WALOpenSegment seg; /* * beginning of prior page read, and its TLI. Doesn't necessarily @@ -202,12 +210,17 @@ struct XLogReaderState /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, + const char *waldir, XLogPageReadCB pagereadfunc, void *private_data); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* Initialize supporting structures */ +extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, + int segsize, const char *waldir); + /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr recptr, char **errormsg); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 4105b59904..2df98e45b2 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -49,8 +49,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI); + XLogRecPtr targetRecPtr, char *cur_page); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index a9c178a9e6..012096f183 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -14,6 +14,6 @@ extern int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, - char *cur_page, TimeLineID *pageTLI); + char *cur_page); #endif