Adjust walsender usage of xlogreader, simplify APIs

* Have both physical and logical walsender share a 'xlogreader' state
  struct for tracking state.  This replaces the existing globals sendSeg
  and sendCxt.

* Change WALRead not to receive XLogReaderState->seg and ->segcxt as
  separate arguments anymore; just use the ones from 'state'.  This is
  made possible by the above change.

* have the XLogReader segment_open contract require the callbacks to
  install the file descriptor in the state struct themselves instead of
  returning it.  xlogreader was already ignoring any possible failed
  return from the callbacks, relying solely on them never returning.

  (This point is not altogether excellent, as it means the callbacks
  have to know more of XLogReaderState; but to really improve on that
  we would have to pass back error info from the callbacks to
  xlogreader.  And the complexity would not be saved but instead just
  transferred to the callers of WALRead, which would have to learn how
  to throw errors from the open_segment callback in addition of, as
  currently, from pg_pread.)

* segment_open no longer receives the 'segcxt' as a separate argument,
  since it's part of the XLogReaderState argument.

Per comments from Kyotaro Horiguchi.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20200511203336.GA9913@alvherre.pgsql
This commit is contained in:
Alvaro Herrera 2020-05-13 12:17:08 -04:00
parent 043e3e0401
commit 850196b610
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
6 changed files with 83 additions and 105 deletions

View File

@ -1044,14 +1044,12 @@ err:
/* /*
* Helper function to ease writing of XLogRoutine->page_read callbacks. * Helper function to ease writing of XLogRoutine->page_read callbacks.
* If this function is used, caller must supply an open_segment callback in * If this function is used, caller must supply a segment_open callback in
* 'state', as that is used here. * 'state', as that is used here.
* *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'. * fetched from timeline 'tli'.
* *
* 'seg/segcxt' identify the last segment used.
*
* Returns true if succeeded, false if an error occurs, in which case * Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details. * 'errinfo' receives error details.
* *
@ -1061,7 +1059,6 @@ err:
bool bool
WALRead(XLogReaderState *state, WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt,
WALReadError *errinfo) WALReadError *errinfo)
{ {
char *p; char *p;
@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state,
int segbytes; int segbytes;
int readbytes; int readbytes;
startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/* /*
* If the data we want is not in a segment we have open, close what we * If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's * have (if anything) and open the next one, using the caller's
* provided openSegment callback. * provided openSegment callback.
*/ */
if (seg->ws_file < 0 || if (state->seg.ws_file < 0 ||
!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
tli != seg->ws_tli) tli != state->seg.ws_tli)
{ {
XLogSegNo nextSegNo; XLogSegNo nextSegNo;
if (seg->ws_file >= 0) if (state->seg.ws_file >= 0)
state->routine.segment_close(state); state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
seg->ws_file = state->routine.segment_open(state, nextSegNo, state->routine.segment_open(state, nextSegNo, &tli);
segcxt, &tli);
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */ /* Update the current segment info. */
seg->ws_tli = tli; state->seg.ws_tli = tli;
seg->ws_segno = nextSegNo; state->seg.ws_segno = nextSegNo;
} }
/* How many bytes are within this segment? */ /* How many bytes are within this segment? */
if (nbytes > (segcxt->ws_segsize - startoff)) if (nbytes > (state->segcxt.ws_segsize - startoff))
segbytes = segcxt->ws_segsize - startoff; segbytes = state->segcxt.ws_segsize - startoff;
else else
segbytes = nbytes; segbytes = nbytes;
@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state,
/* Reset errno first; eases reporting non-errno-affecting errors */ /* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0; errno = 0;
readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND #ifndef FRONTEND
pgstat_report_wait_end(); pgstat_report_wait_end();
@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state,
errinfo->wre_req = segbytes; errinfo->wre_req = segbytes;
errinfo->wre_read = readbytes; errinfo->wre_read = readbytes;
errinfo->wre_off = startoff; errinfo->wre_off = startoff;
errinfo->wre_seg = *seg; errinfo->wre_seg = state->seg;
return false; return false;
} }

View File

@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
} }
/* XLogReaderRoutine->segment_open callback for local pg_wal files */ /* XLogReaderRoutine->segment_open callback for local pg_wal files */
int void
wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
WALSegmentContext *segcxt, TimeLineID *tli_p) TimeLineID *tli_p)
{ {
TimeLineID tli = *tli_p; TimeLineID tli = *tli_p;
char path[MAXPGPATH]; char path[MAXPGPATH];
int fd;
XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (fd >= 0) if (state->seg.ws_file >= 0)
return fd; return;
if (errno == ENOENT) if (errno == ENOENT)
ereport(ERROR, ereport(ERROR,
@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", errmsg("could not open file \"%s\": %m",
path))); path)));
return -1; /* keep compiler quiet */
} }
/* stock XLogReaderRoutine->segment_close callback */ /* stock XLogReaderRoutine->segment_close callback */
@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* zero-padded up to the page boundary if it's incomplete. * zero-padded up to the page boundary if it's incomplete.
*/ */
if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
&state->seg, &state->segcxt,
&errinfo)) &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);

View File

@ -129,8 +129,14 @@ bool log_replication_commands = false;
*/ */
bool wake_wal_senders = false; bool wake_wal_senders = false;
static WALOpenSegment *sendSeg = NULL; /*
static WALSegmentContext *sendCxt = NULL; * Physical walsender does not use xlogreader to read WAL, but it does use a
* fake one to keep state. Logical walsender uses a proper xlogreader. Both
* keep the 'xlogreader' pointer to the right one, for the sake of common
* routines.
*/
static XLogReaderState fake_xlogreader;
static XLogReaderState *xlogreader;
/* /*
* These variables keep track of the state of the timeline we're currently * These variables keep track of the state of the timeline we're currently
@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
WALSegmentContext *segcxt, TimeLineID *tli_p); TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx); static void UpdateSpillStats(LogicalDecodingContext *ctx);
@ -280,12 +286,19 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */ /* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
/* Make sure we can remember the current read position in XLOG. */ /*
sendSeg = (WALOpenSegment *) * Prepare physical walsender's fake xlogreader struct. Logical walsender
MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); * does this later.
sendCxt = (WALSegmentContext *) */
MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); if (!am_db_walsender)
WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); {
xlogreader = &fake_xlogreader;
xlogreader->routine =
*XL_ROUTINE(.segment_open = WalSndSegmentOpen,
.segment_close = wal_segment_close);
WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
wal_segment_size, NULL);
}
} }
/* /*
@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep(); ConditionVariableCancelSleep();
pgstat_report_wait_end(); pgstat_report_wait_end();
if (sendSeg->ws_file >= 0) if (xlogreader->seg.ws_file >= 0)
{ wal_segment_close(xlogreader);
close(sendSeg->ws_file);
sendSeg->ws_file = -1;
}
if (MyReplicationSlot != NULL) if (MyReplicationSlot != NULL)
ReplicationSlotRelease(); ReplicationSlotRelease();
@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page, cur_page,
targetPagePtr, targetPagePtr,
XLOG_BLCKSZ, XLOG_BLCKSZ,
sendSeg->ws_tli, /* Pass the current TLI because only state->seg.ws_tli, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new * WalSndSegmentOpen controls whether new
* TLI is needed. */ * TLI is needed. */
sendSeg,
sendCxt,
&errinfo)) &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* read() succeeds in that case, but the data we tried to read might * read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records. * already have been overwritten with new WAL records.
*/ */
XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
CheckXLogRemoved(segno, sendSeg->ws_tli); CheckXLogRemoved(segno, state->seg.ws_tli);
return count; return count;
} }
@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
.segment_close = wal_segment_close), .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData, WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress); WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader;
WalSndSetState(WALSNDSTATE_CATCHUP); WalSndSetState(WALSNDSTATE_CATCHUP);
@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
} }
/* XLogReaderRoutine->segment_open callback */ /* XLogReaderRoutine->segment_open callback */
static int static void
WalSndSegmentOpen(XLogReaderState *state, WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p) TimeLineID *tli_p)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
int fd;
/*------- /*-------
* When reading from a historic timeline, and there is a timeline switch * When reading from a historic timeline, and there is a timeline switch
@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
{ {
XLogSegNo endSegNo; XLogSegNo endSegNo;
XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
if (sendSeg->ws_segno == endSegNo) if (state->seg.ws_segno == endSegNo)
*tli_p = sendTimeLineNextTLI; *tli_p = sendTimeLineNextTLI;
} }
XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (fd >= 0) if (state->seg.ws_file >= 0)
return fd; return;
/* /*
* If the file is not found, assume it's because the standby asked for a * If the file is not found, assume it's because the standby asked for a
@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", errmsg("could not open file \"%s\": %m",
path))); path)));
return -1; /* keep compiler quiet */
} }
/* /*
@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
Size nbytes; Size nbytes;
XLogSegNo segno; XLogSegNo segno;
WALReadError errinfo; WALReadError errinfo;
static XLogReaderState fake_xlogreader =
{
/* Fake xlogreader state for WALRead */
.routine.segment_open = WalSndSegmentOpen,
.routine.segment_close = wal_segment_close
};
/* If requested switch the WAL sender to the stopping state. */ /* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING) if (got_STOPPING)
@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{ {
/* close the current file. */ /* close the current file. */
if (sendSeg->ws_file >= 0) if (xlogreader->seg.ws_file >= 0)
close(sendSeg->ws_file); wal_segment_close(xlogreader);
sendSeg->ws_file = -1;
/* Send CopyDone */ /* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0); pq_putmessage_noblock('c', NULL, 0);
@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes); enlargeStringInfo(&output_message, nbytes);
retry: retry:
if (!WALRead(&fake_xlogreader, if (!WALRead(xlogreader,
&output_message.data[output_message.len], &output_message.data[output_message.len],
startptr, startptr,
nbytes, nbytes,
sendSeg->ws_tli, /* Pass the current TLI because only xlogreader->seg.ws_tli, /* Pass the current TLI because
* WalSndSegmentOpen controls whether new * only WalSndSegmentOpen controls
* TLI is needed. */ * whether new TLI is needed. */
sendSeg,
sendCxt,
&errinfo)) &errinfo))
WALReadRaiseError(&errinfo); WALReadRaiseError(&errinfo);
/* See logical_read_xlog_page(). */ /* See logical_read_xlog_page(). */
XLByteToSeg(startptr, segno, sendCxt->ws_segsize); XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
CheckXLogRemoved(segno, sendSeg->ws_tli); CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
/* /*
* During recovery, the currently-open WAL file might be replaced with the * During recovery, the currently-open WAL file might be replaced with the
@ -2792,10 +2789,9 @@ retry:
walsnd->needreload = false; walsnd->needreload = false;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
if (reload && sendSeg->ws_file >= 0) if (reload && xlogreader->seg.ws_file >= 0)
{ {
close(sendSeg->ws_file); wal_segment_close(xlogreader);
sendSeg->ws_file = -1;
goto retry; goto retry;
} }

View File

@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname)
} }
/* pg_waldump's XLogReaderRoutine->segment_open callback */ /* pg_waldump's XLogReaderRoutine->segment_open callback */
static int static void
WALDumpOpenSegment(XLogReaderState *state, WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p) TimeLineID *tli_p)
{ {
TimeLineID tli = *tli_p; TimeLineID tli = *tli_p;
char fname[MAXPGPATH]; char fname[MAXPGPATH];
int fd;
int tries; int tries;
XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
/* /*
* In follow mode there is a short period of time after the server has * In follow mode there is a short period of time after the server has
@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state,
*/ */
for (tries = 0; tries < 10; tries++) for (tries = 0; tries < 10; tries++)
{ {
fd = open_file_in_directory(segcxt->ws_dir, fname); state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname);
if (fd >= 0) if (state->seg.ws_file >= 0)
return fd; return;
if (errno == ENOENT) if (errno == ENOENT)
{ {
int save_errno = errno; int save_errno = errno;
@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state,
} }
fatal_error("could not find file \"%s\": %m", fname); fatal_error("could not find file \"%s\": %m", fname);
return -1; /* keep compiler quiet */
} }
/* /*
@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
} }
if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
&state->seg, &state->segcxt,
&errinfo)) &errinfo))
{ {
WALOpenSegment *seg = &errinfo.wre_seg; WALOpenSegment *seg = &errinfo.wre_seg;

View File

@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
int reqLen, int reqLen,
XLogRecPtr targetRecPtr, XLogRecPtr targetRecPtr,
char *readBuf); char *readBuf);
typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
XLogSegNo nextSegNo, XLogSegNo nextSegNo,
WALSegmentContext *segcxt, TimeLineID *tli_p);
TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine typedef struct XLogReaderRoutine
@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine
XLogPageReadCB page_read; XLogPageReadCB page_read;
/* /*
* Callback to open the specified WAL segment for reading. The file * Callback to open the specified WAL segment for reading. ->seg.ws_file
* descriptor of the opened segment shall be returned. In case of * shall be set to the file descriptor of the opened segment. In case of
* failure, an error shall be raised by the callback and it shall not * failure, an error shall be raised by the callback and it shall not
* return. * return.
* *
* "nextSegNo" is the number of the segment to be opened. * "nextSegNo" is the number of the segment to be opened.
* *
* "segcxt" is additional information about the segment.
*
* "tli_p" is an input/output argument. WALRead() uses it to pass the * "tli_p" is an input/output argument. WALRead() uses it to pass the
* timeline in which the new segment should be found, but the callback can * timeline in which the new segment should be found, but the callback can
* use it to return the TLI that it actually opened. * use it to return the TLI that it actually opened.
*
* BasicOpenFile() is the preferred way to open the segment file in
* backend code, whereas open(2) should be used in frontend.
*/ */
WALSegmentOpenCB segment_open; WALSegmentOpenCB segment_open;
@ -301,9 +295,7 @@ typedef struct WALReadError
extern bool WALRead(XLogReaderState *state, extern bool WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count, char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg, TimeLineID tli, WALReadError *errinfo);
WALSegmentContext *segcxt,
WALReadError *errinfo);
/* Functions for decoding an XLogRecord */ /* Functions for decoding an XLogRecord */

View File

@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state, extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page); XLogRecPtr targetRecPtr, char *cur_page);
extern int wal_segment_open(XLogReaderState *state, extern void wal_segment_open(XLogReaderState *state,
XLogSegNo nextSegNo, XLogSegNo nextSegNo,
WALSegmentContext *segcxt,
TimeLineID *tli_p); TimeLineID *tli_p);
extern void wal_segment_close(XLogReaderState *state); extern void wal_segment_close(XLogReaderState *state);