diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7cee8b92c9..aae3fee24c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1044,14 +1044,12 @@ err: /* * Helper function to ease writing of XLogRoutine->page_read callbacks. - * If this function is used, caller must supply an open_segment callback in + * If this function is used, caller must supply a segment_open callback in * 'state', as that is used here. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. - * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * @@ -1061,7 +1059,6 @@ err: bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALOpenSegment *seg, WALSegmentContext *segcxt, WALReadError *errinfo) { char *p; @@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state, int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); /* * If the data we want is not in a segment we have open, close what we * have (if anything) and open the next one, using the caller's * provided openSegment callback. */ - if (seg->ws_file < 0 || - !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || - tli != seg->ws_tli) + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) { XLogSegNo nextSegNo; - if (seg->ws_file >= 0) + if (state->seg.ws_file >= 0) state->routine.segment_close(state); - XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = state->routine.segment_open(state, nextSegNo, - segcxt, &tli); + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + state->routine.segment_open(state, nextSegNo, &tli); + + /* This shouldn't happen -- indicates a bug in segment_open */ + Assert(state->seg.ws_file >= 0); /* Update the current segment info. */ - seg->ws_tli = tli; - seg->ws_segno = nextSegNo; + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; } /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; else segbytes = nbytes; @@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state, /* Reset errno first; eases reporting non-errno-affecting errors */ errno = 0; - readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); #ifndef FRONTEND pgstat_report_wait_end(); @@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state, errinfo->wre_req = segbytes; errinfo->wre_read = readbytes; errinfo->wre_off = startoff; - errinfo->wre_seg = *seg; + errinfo->wre_seg = state->seg; return false; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 0bb69447c2..322b0e8ff5 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } /* XLogReaderRoutine->segment_open callback for local pg_wal files */ -int +void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p) + TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; - int fd; - XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) ereport(ERROR, @@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - - return -1; /* keep compiler quiet */ } /* stock XLogReaderRoutine->segment_close callback */ @@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &state->seg, &state->segcxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9f14b99231..3367aa98f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -129,8 +129,14 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -static WALOpenSegment *sendSeg = NULL; -static WALSegmentContext *sendCxt = NULL; +/* + * Physical walsender does not use xlogreader to read WAL, but it does use a + * fake one to keep state. Logical walsender uses a proper xlogreader. Both + * keep the 'xlogreader' pointer to the right one, for the sake of common + * routines. + */ +static XLogReaderState fake_xlogreader; +static XLogReaderState *xlogreader; /* * These variables keep track of the state of the timeline we're currently @@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -280,12 +286,19 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); - /* Make sure we can remember the current read position in XLOG. */ - sendSeg = (WALOpenSegment *) - MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); - sendCxt = (WALSegmentContext *) - MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); - WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); + /* + * Prepare physical walsender's fake xlogreader struct. Logical walsender + * does this later. + */ + if (!am_db_walsender) + { + xlogreader = &fake_xlogreader; + xlogreader->routine = + *XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close); + WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt, + wal_segment_size, NULL); + } } /* @@ -302,11 +315,8 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - } + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) ReplicationSlotRelease(); @@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - sendSeg->ws_tli, /* Pass the current TLI because only + state->seg.ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); @@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * read() succeeds in that case, but the data we tried to read might * already have been overwritten with new WAL records. */ - XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); - CheckXLogRemoved(segno, sendSeg->ws_tli); + XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); + CheckXLogRemoved(segno, state->seg.ws_tli); return count; } @@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); + xlogreader = logical_decoding_ctx->reader; WalSndSetState(WALSNDSTATE_CATCHUP); @@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg) } /* XLogReaderRoutine->segment_open callback */ -static int -WalSndSegmentOpen(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { char path[MAXPGPATH]; - int fd; /*------- * When reading from a historic timeline, and there is a timeline switch @@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state, { XLogSegNo endSegNo; - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) + XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); + if (state->seg.ws_segno == endSegNo) *tli_p = sendTimeLineNextTLI; } - XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; /* * If the file is not found, assume it's because the standby asked for a @@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - return -1; /* keep compiler quiet */ } /* @@ -2537,12 +2543,6 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; - static XLogReaderState fake_xlogreader = - { - /* Fake xlogreader state for WALRead */ - .routine.segment_open = WalSndSegmentOpen, - .routine.segment_close = wal_segment_close - }; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2685,9 +2685,8 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - sendSeg->ws_file = -1; + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); @@ -2760,21 +2759,19 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(&fake_xlogreader, + if (!WALRead(xlogreader, &output_message.data[output_message.len], startptr, nbytes, - sendSeg->ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ - sendSeg, - sendCxt, + xlogreader->seg.ws_tli, /* Pass the current TLI because + * only WalSndSegmentOpen controls + * whether new TLI is needed. */ &errinfo)) WALReadRaiseError(&errinfo); /* See logical_read_xlog_page(). */ - XLByteToSeg(startptr, segno, sendCxt->ws_segsize); - CheckXLogRemoved(segno, sendSeg->ws_tli); + XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); + CheckXLogRemoved(segno, xlogreader->seg.ws_tli); /* * During recovery, the currently-open WAL file might be replaced with the @@ -2792,10 +2789,9 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendSeg->ws_file >= 0) + if (reload && xlogreader->seg.ws_file >= 0) { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; + wal_segment_close(xlogreader); goto retry; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e29f65500f..d1a0678935 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname) } /* pg_waldump's XLogReaderRoutine->segment_open callback */ -static int -WALDumpOpenSegment(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char fname[MAXPGPATH]; - int fd; int tries; - XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); + XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize); /* * In follow mode there is a short period of time after the server has @@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state, */ for (tries = 0; tries < 10; tries++) { - fd = open_file_in_directory(segcxt->ws_dir, fname); - if (fd >= 0) - return fd; + state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) { int save_errno = errno; @@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state, } fatal_error("could not find file \"%s\": %m", fname); - return -1; /* keep compiler quiet */ } /* @@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 80cf62acb7..c21b0ba972 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, - XLogSegNo nextSegNo, - WALSegmentContext *segcxt, - TimeLineID *tli_p); +typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef struct XLogReaderRoutine @@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine XLogPageReadCB page_read; /* - * Callback to open the specified WAL segment for reading. The file - * descriptor of the opened segment shall be returned. In case of + * Callback to open the specified WAL segment for reading. ->seg.ws_file + * shall be set to the file descriptor of the opened segment. In case of * failure, an error shall be raised by the callback and it shall not * return. * * "nextSegNo" is the number of the segment to be opened. * - * "segcxt" is additional information about the segment. - * * "tli_p" is an input/output argument. WALRead() uses it to pass the * timeline in which the new segment should be found, but the callback can * use it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in - * backend code, whereas open(2) should be used in frontend. */ WALSegmentOpenCB segment_open; @@ -301,9 +295,7 @@ typedef struct WALReadError extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, - WALReadError *errinfo); + TimeLineID tli, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 68ce815476..e59b6cf3a9 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -extern int wal_segment_open(XLogReaderState *state, +extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); extern void wal_segment_close(XLogReaderState *state);