diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 89335b64a2..3137cb3ecc 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1330,11 +1330,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; TimeLineID save_currtli = ThisTimeLineID; - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &read_local_xlog_page, - .segment_open = &wal_segment_open, - .segment_close = &wal_segment_close), - NULL); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -1342,7 +1339,12 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!read_local_xlog_page(xlogreader)) + break; + } /* * Restore immediately the timeline where it was previously, as diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c1d4415a43..7faac01bf2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readLen indicates how much of the current - * page has been read into readBuf, and readSource indicates where we got - * the currently open file from. + * will be just past that page. readSource indicates where we got the + * currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; -static XLogSegNo readSegNo = 0; -static uint32 readOff = 0; -static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr); + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + XLogSegNo readSegNo); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(), NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); if (!debug_reader) { @@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - /* Pass through parameters to XLogPageRead */ - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + + } - record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6457,7 +6450,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6616,13 +6608,9 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &XLogPageRead, - .segment_open = NULL, - .segment_close = wal_segment_close), - &private); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7819,7 +7807,8 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == + XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -12107,13 +12096,15 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf) +static bool +XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess) { - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; + char *readBuf = state->readBuf; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + int readLen = 0; + XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) + !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); + XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12156,17 +12147,15 @@ retry: flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr)) + randAccess, fetching_ckpt, + targetRecPtr, state->seg.ws_segno)) { if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; - - return -1; + XLogReaderSetInputData(state, -1); + return false; } } @@ -12193,40 +12182,36 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ - readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, targetPageOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); + fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == readSegNo); - Assert(targetPageOff == readOff); - Assert(reqLen <= readLen); + Assert(targetSegNo == state->seg.ws_segno); + Assert(readLen >= reqLen); - xlogreader->seg.ws_tli = curFileTLI; + state->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12254,14 +12239,16 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) { - /* reset any error XLogReaderValidatePageHeader() might have set */ - xlogreader->errormsg_buf[0] = '\0'; + /* reset any error StateValidatePageHeader() might have set */ + state->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - return readLen; + Assert(state->readPagePtr == targetPagePtr); + XLogReaderSetInputData(state, readLen); + return true; next_record_is_invalid: lastSourceFailed = true; @@ -12269,14 +12256,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + XLogReaderSetInputData(state, -1); + return false; } /* @@ -12307,7 +12294,8 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr) + bool fetching_ckpt, XLogRecPtr tliRecPtr, + XLogSegNo readSegNo) { static TimestampTz last_fail_time = 0; TimestampTz now; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 42738eb940..02257768ec 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,11 +36,11 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); @@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogReaderRoutine *routine, void *private_data) + WALSegmentCleanupCB cleanup_cb) { XLogReaderState *state; @@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, return NULL; /* initialize caller-provided support functions */ - state->routine = *routine; + state->cleanup_cb = cleanup_cb; state->max_block_id = -1; @@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - /* system_identifier initialized to zeroes above */ - state->private_data = private_data; - /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ + /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state) { int block_id; - if (state->seg.ws_file != -1) - state->routine.segment_close(state); + if (state->seg.ws_file >= 0) + state->cleanup_cb(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->readRecordState = XLREAD_NEXT_RECORD; } /* @@ -254,303 +253,456 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the page_read callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * This function may return XLREAD_NEED_DATA several times before returning a + * result record. The caller shall read in some new data then call this + * function again with the same parameters. * - * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * When a record is successfully read, returns XLREAD_SUCCESS with result + * record being stored in *record. Otherwise *record is NULL. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. - */ -XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) -{ - XLogRecPtr RecPtr; - XLogRecord *record; - XLogRecPtr targetPagePtr; - bool randAccess; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; - int readOff; + * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the + * current record. In that case, state->readPagePtr and state->reqLen inform + * the desired position and minimum length of data needed. The caller shall + * read in the requested data and set state->readBuf to point to a buffer + * containing it. The caller must also set state->seg->ws_tli and + * state->readLen to indicate the timeline that it was read from, and the + * length of data that is now available (which must be >= given reqLen), + * respectively. + * + * If invalid data is encountered, returns XLREAD_FAIL and sets *record to + * NULL. *errormsg is set to a string with details of the failure. The + * returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. + * + * + * This function runs a state machine consisting of the following states. + * + * XLREAD_NEXT_RECORD: + * The initial state. If called with a valid XLogRecPtr, try to read a + * record at that position. If invalid RecPtr is given try to read a record + * just after the last one read. The next state is XLREAD_TOT_LEN. + * + * XLREAD_TOT_LEN: + * Examining record header. Ends after reading record length. + * recordRemainLen and recordGotLen are initialized. The next state is + * XLREAD_FIRST_FRAGMENT. + * + * XLREAD_FIRST_FRAGMENT: + * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_CONTINUATION if we need more data. - /* - * randAccess indicates whether to verify the previous-record pointer of - * the record we're reading. We only do this if we're reading - * sequentially, which is what we initially assume. - */ - randAccess = false; + * XLREAD_CONTINUATION: + * Reading continuation of record. If the whole record is now decoded, goes + * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how + * much is left. + * + * If invalid data is found in any state, the state machine stays at the + * current state. This behavior allows us to continue reading a record + * after switching to a different source, during streaming replication. + */ +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) +{ + XLogRecord *prec; + + *record = NULL; /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; - ResetDecoder(state); - - RecPtr = state->EndRecPtr; - - if (state->ReadRecPtr != InvalidXLogRecPtr) + switch (state->readRecordState) { - /* read the record after the one we just read */ + case XLREAD_NEXT_RECORD: + ResetDecoder(state); - /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that until - * we've read in the page, since the header size is variable. - */ - } - else - { - /* - * Caller supplied a position to start at. - * - * In this case, EndRecPtr should already be pointing to a valid - * record starting position. - */ - Assert(XRecOffIsValid(RecPtr)); - randAccess = true; - } - - state->currRecPtr = RecPtr; - - targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); - targetRecOff = RecPtr % XLOG_BLCKSZ; - - /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. - */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; - - /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) - { - /* - * At page start, so skip over page header. - */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } - - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } - - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); - - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; - - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) - { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); - goto err; - } - gotheader = false; - } - - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; - char *buffer; - uint32 gotlen; - - /* - * Enlarge readRecordBuf as needed. - */ - if (total_len > state->readRecordBufSize && - !allocate_recordbuf(state, total_len)) - { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", - total_len, LSN_FORMAT_ARGS(RecPtr)); - goto err; - } - - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; - - do - { - /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; - - /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); - - if (readOff < 0) - goto err; - - Assert(SizeOfXLogShortPHD <= readOff); - - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) state->readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + if (state->ReadRecPtr != InvalidXLogRecPtr) { - report_invalid_record(state, - "there is no contrecord flag at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; + /* read the record after the one we just read */ + + /* + * EndRecPtr is pointing to end+1 of the previous WAL record. + * If we're at a page boundary, no more records can fit on the + * current page. We must skip over the page header, but we + * can't do that until we've read in the page, since the + * header size is variable. + */ + state->PrevRecPtr = state->ReadRecPtr; + state->ReadRecPtr = state->EndRecPtr; + } + else + { + /* + * Caller supplied a position to start at. + * + * In this case, EndRecPtr should already be pointing to a + * valid record starting position. + */ + Assert(XRecOffIsValid(state->EndRecPtr)); + state->ReadRecPtr = state->EndRecPtr; + + /* + * We cannot verify the previous-record pointer when we're + * seeking to a particular record. Reset PrevRecPtr so that we + * won't try doing that. + */ + state->PrevRecPtr = InvalidXLogRecPtr; + state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ } - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + state->record_verified = false; + state->readRecordState = XLREAD_TOT_LEN; + /* fall through */ + + case XLREAD_TOT_LEN: { - report_invalid_record(state, - "invalid contrecord length %u (expected %lld) at %X/%X", - pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + uint32 total_len; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; + XLogPageHeader pageHeader; - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); + /* + * Check if we have enough data. For the first record in the + * page, the requesting length doesn't contain page header. + */ + if (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + return XLREAD_NEED_DATA; - Assert(pageHeaderSize <= readOff); - - contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; - - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); - - memcpy(buffer, (char *) contdata, len); - buffer += len; - gotlen += len; - - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, - record, randAccess)) + /* error out if caller supplied bogus page */ + if (!state->page_verified) goto err; - gotheader = true; + + /* examine page header now. */ + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* At page start, so skip over page header. */ + state->ReadRecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + LSN_FORMAT_ARGS(state->ReadRecPtr)); + goto err; + } + + pageHeader = (XLogPageHeader) state->readBuf; + if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); + + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the + * whole record header might not fit on this page. xl_tot_len + * is the first field of the struct, so it must be on this + * page (the records are MAXALIGNed), but we cannot access any + * other fields until we've verified that we got the whole + * header. + */ + prec = (XLogRecord *) (state->readBuf + + state->ReadRecPtr % XLOG_BLCKSZ); + total_len = prec->xl_tot_len; + + /* + * If the whole record header is on this page, validate it + * immediately. Otherwise do just a basic sanity check on + * xl_tot_len, and validate the rest of the header after + * reading it from the next page. The xl_tot_len check is + * necessary here to ensure that we enter the + * XLREAD_CONTINUATION state below; otherwise we might fail to + * apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; + + state->record_verified = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, + "invalid record length at %X/%X: wanted %u, got %u", + LSN_FORMAT_ARGS(state->ReadRecPtr), + (uint32) SizeOfXLogRecord, total_len); + goto err; + } + } + + /* + * Wait for the rest of the record, or the part of the record + * that fit on the first page if crossed a page boundary, to + * become available. + */ + state->recordGotLen = 0; + state->recordRemainLen = total_len; + state->readRecordState = XLREAD_FIRST_FRAGMENT; } - } while (gotlen < total_len); + /* fall through */ - Assert(gotheader); + case XLREAD_FIRST_FRAGMENT: + { + uint32 total_len = state->recordRemainLen; + uint32 request_len; + uint32 record_len; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* + * Wait for the rest of the record on the first page to become + * available + */ + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(pageHeader->xlp_rem_len); - } - else - { - /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; + request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); + record_len = request_len - targetRecOff; - /* Record does not cross a page boundary */ - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* ReadRecPtr contains page header */ + Assert(targetRecOff != 0); + if (XLogNeedData(state, targetPagePtr, request_len, true)) + return XLREAD_NEED_DATA; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - state->ReadRecPtr = RecPtr; + prec = (XLogRecord *) (state->readBuf + targetRecOff); + + /* validate record header if not yet */ + if (!state->record_verified && record_len >= SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; + + state->record_verified = true; + } + + + if (total_len == record_len) + { + /* Record does not cross a page boundary */ + Assert(state->record_verified); + + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; + + state->record_verified = true; /* to be tidy */ + + /* We already checked the header earlier */ + state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } + + /* + * The record continues on the next page. Need to reassemble + * record + */ + Assert(total_len > record_len); + + /* Enlarge readRecordBuf as needed. */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) + { + /* We treat this as a "bogus data" condition */ + report_invalid_record(state, + "record length %u at %X/%X too long", + total_len, + LSN_FORMAT_ARGS(state->ReadRecPtr)); + goto err; + } + + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, state->readBuf + targetRecOff, + record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* Calculate pointer to beginning of next page */ + state->recordContRecPtr = state->ReadRecPtr + record_len; + Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); + + state->readRecordState = XLREAD_CONTINUATION; + } + /* fall through */ + + case XLREAD_CONTINUATION: + { + XLogPageHeader pageHeader = NULL; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr = InvalidXLogRecPtr; + + /* + * we enter this state only if we haven't read the whole + * record. + */ + Assert(state->recordRemainLen > 0); + + while (state->recordRemainLen > 0) + { + char *contdata; + uint32 request_len PG_USED_FOR_ASSERTS_ONLY; + uint32 record_len; + + /* Wait for the next page to become available */ + targetPagePtr = state->recordContRecPtr; + + /* this request contains page header */ + Assert(targetPagePtr != 0); + if (XLogNeedData(state, targetPagePtr, + Min(state->recordRemainLen, XLOG_BLCKSZ), + false)) + return XLREAD_NEED_DATA; + + if (!state->page_verified) + goto err; + + Assert(SizeOfXLogShortPHD <= state->readLen); + + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) state->readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + { + report_invalid_record( + state, + "there is no contrecord flag at %X/%X reading %X/%X", + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* + * Cross-check that xlp_rem_len agrees with how much of + * the record we expect there to be left. + */ + if (pageHeader->xlp_rem_len == 0 || + pageHeader->xlp_rem_len != state->recordRemainLen) + { + report_invalid_record( + state, + "invalid contrecord length %u at %X/%X reading %X/%X, expected %u", + pageHeader->xlp_rem_len, + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + state->recordRemainLen); + goto err; + } + + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); + + /* + * XLogNeedData should have ensured that the whole page + * header was read + */ + Assert(pageHeaderSize <= state->readLen); + + contdata = (char *) state->readBuf + pageHeaderSize; + record_len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < record_len) + record_len = pageHeader->xlp_rem_len; + + request_len = record_len + pageHeaderSize; + + /* + * XLogNeedData should have ensured all needed data was + * read + */ + Assert(request_len <= state->readLen); + + memcpy(state->readRecordBuf + state->recordGotLen, + (char *) contdata, record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* If we just reassembled the record header, validate it. */ + if (!state->record_verified) + { + Assert(state->recordGotLen >= SizeOfXLogRecord); + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, + (XLogRecord *) state->readRecordBuf)) + goto err; + + state->record_verified = true; + } + + /* + * Calculate pointer to beginning of next page, and + * continue + */ + state->recordContRecPtr += XLOG_BLCKSZ; + } + + /* targetPagePtr is pointing the last-read page here */ + prec = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; + + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } } /* * Special processing if it's an XLOG SWITCH record */ - if (record->xl_rmid == RM_XLOG_ID && - (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if ((*record)->xl_rmid == RM_XLOG_ID && + ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->EndRecPtr += state->segcxt.ws_segsize - 1; state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; - else - return NULL; + if (DecodeXLogRecord(state, *record, errormsg)) + return XLREAD_SUCCESS; + + *record = NULL; + return XLREAD_FAIL; err: /* - * Invalidate the read state. We might read from a different source after + * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); @@ -558,113 +710,141 @@ err: if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; - return NULL; + *record = NULL; + return XLREAD_FAIL; } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the page_read() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. header_inclusive indicates that + * reqLen is calculated including page header length. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns false if the buffer already contains the requested data, or found + * error. state->page_verified is set to true for the former and false for the + * latter. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall call this function + * again after filling the buffer at least with that portion of data and set + * state->readLen to the length of actually loaded data. + * + * If header_inclusive is false, corrects reqLen internally by adding the + * actual page header length and may request caller for new data. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool header_inclusive) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; + uint32 addLen = 0; - Assert((pageptr % XLOG_BLCKSZ) == 0); + /* Some data is loaded, but page header is not verified yet. */ + if (!state->page_verified && + !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0) + { + uint32 pageHeaderSize; - XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); - targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + /* just loaded new data so needs to verify page header */ - /* check whether we have all the requested data already */ - if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->segoff && reqLen <= state->readLen) - return state->readLen; + /* The caller must have loaded at least page header */ + Assert(state->readLen >= SizeOfXLogShortPHD); + + /* + * We have enough data to check the header length. Recheck the loaded + * length against the actual header length. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); + + /* Request more data if we don't have the full header. */ + if (state->readLen < pageHeaderSize) + { + state->reqLen = pageHeaderSize; + return true; + } + + /* Now that we know we have the full header, validate it. */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* That's bad. Force reading the page again. */ + XLogReaderInvalReadState(state); + + return false; + } + + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->seg.ws_segno, + state->segcxt.ws_segsize); + } + + /* + * The loaded page may not be the one caller is supposing to read when we + * are verifying the first page of new segment. In that case, skip further + * verification and immediately load the target page. + */ + if (state->page_verified && pageptr == state->readPagePtr) + { + /* + * calculate additional length for page header keeping the total + * length within the block size. + */ + if (!header_inclusive) + { + uint32 pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; + } + + /* Return if we already have it. */ + if (reqLen + addLen <= state->readLen) + return false; + } + + /* Data is not in our buffer, request the caller for it. */ + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + Assert((pageptr % XLOG_BLCKSZ) == 0); + + /* + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the + * next invocation as the caller might now be rereading data from a + * different source. + */ + state->page_verified = false; /* - * Data is not in our buffer. - * - * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the page_read callback might - * now be rereading data from a different source. - * * Whenever switching to a new WAL segment, we read the first page of the * file and validate its header, even if that's not where the target * record is. This is so that we can check the additional identification - * info that is present in the first page's "long" header. + * info that is present in the first page's "long" header. Don't do this + * if the caller requested the first page in the segment. */ if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - - readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; - - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); - - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + /* + * Then we'll see that the targetSegNo now matches the ws_segno, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->reqLen = XLOG_BLCKSZ; + return true; } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * Request the caller to load the page. We need at least a short page + * header so that we can validate it. */ - readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; - - Assert(readLen <= XLOG_BLCKSZ); - - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; - - Assert(readLen >= reqLen); - - hdr = (XLogPageHeader) state->readBuf; - - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) - { - readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; - } - - /* - * Now that we know we have the full header, validate it. - */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->seg.ws_segno = targetSegNo; - state->segoff = targetPageOff; - state->readLen = readLen; - - return readLen; - -err: - XLogReaderInvalReadState(state); - return -1; + state->readPagePtr = pageptr; + state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* @@ -673,9 +853,7 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->seg.ws_segno = 0; - state->segoff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state) * * This is just a convenience subroutine to avoid duplicated code in * XLogReadRecord. It's not intended for use from anywhere else. + * + * If PrevRecPtr is valid, the xl_prev is is cross-checked with it. */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, - bool randAccess) + XLogRecPtr PrevRecPtr, XLogRecord *record) { if (record->xl_tot_len < SizeOfXLogRecord) { @@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; } - if (randAccess) + if (PrevRecPtr == InvalidXLogRecPtr) { /* * We can't exactly verify the prev-link, but surely it should be less @@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * here. */ +XLogFindNextRecordState * +InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr) +{ + XLogFindNextRecordState *state = (XLogFindNextRecordState *) + palloc_extended(sizeof(XLogFindNextRecordState), + MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); + if (!state) + return NULL; + + state->reader_state = reader_state; + state->targetRecPtr = start_ptr; + state->currRecPtr = start_ptr; + + return state; +} + /* * Find the first record with an lsn >= RecPtr. * @@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * This positions the reader, like XLogBeginRead(), so that the next call to * XLogReadRecord() will read the next valid record. */ -XLogRecPtr -XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +bool +XLogFindNextRecord(XLogFindNextRecordState *state) { - XLogRecPtr tmpRecPtr; - XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; + XLogRecord *record; + XLogReadRecordResult result; char *errormsg; - Assert(!XLogRecPtrIsInvalid(RecPtr)); + Assert(!XLogRecPtrIsInvalid(state->currRecPtr)); /* * skip over potential continuation data, keeping in mind that it may span * multiple pages */ - tmpRecPtr = RecPtr; while (true) { XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at - * least short page-header worth of data + * XLogNeedData() is prepared to handle that and will read at least + * short page-header worth of data */ - targetRecOff = tmpRecPtr % XLOG_BLCKSZ; + targetRecOff = state->currRecPtr % XLOG_BLCKSZ; /* scroll back to page boundary */ - targetPagePtr = tmpRecPtr - targetRecOff; + targetPagePtr = state->currRecPtr - targetRecOff; - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff, + targetRecOff != 0)) + return true; + + if (!state->reader_state->page_verified) goto err; - header = (XLogPageHeader) state->readBuf; + header = (XLogPageHeader) state->reader_state->readBuf; pageHeaderSize = XLogPageHeaderSize(header); - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; + /* we should have read the page header */ + Assert(state->reader_state->readLen >= pageHeaderSize); /* skip over potential continuation data */ if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) @@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * Note that record headers are MAXALIGN'ed */ if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) - tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; + state->currRecPtr = targetPagePtr + XLOG_BLCKSZ; else { /* * The previous continuation record ends in this page. Set - * tmpRecPtr to point to the first valid record + * state->currRecPtr to point to the first valid record */ - tmpRecPtr = targetPagePtr + pageHeaderSize + state->currRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(header->xlp_rem_len); break; } } else { - tmpRecPtr = targetPagePtr + pageHeaderSize; + state->currRecPtr = targetPagePtr + pageHeaderSize; break; } } @@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + XLogBeginRead(state->reader_state, state->currRecPtr); + while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) != + XLREAD_FAIL) { + if (result == XLREAD_NEED_DATA) + return true; + /* past the record we've found, break out */ - if (RecPtr <= state->ReadRecPtr) + if (state->targetRecPtr <= state->reader_state->ReadRecPtr) { /* Rewind the reader to the beginning of the last record. */ - found = state->ReadRecPtr; - XLogBeginRead(state, found); - return found; + state->currRecPtr = state->reader_state->ReadRecPtr; + XLogBeginRead(state->reader_state, state->currRecPtr); + return false; } } err: - XLogReaderInvalReadState(state); + XLogReaderInvalReadState(state->reader_state); - return InvalidXLogRecPtr; + state->currRecPtr = InvalidXLogRecPtr;; + return false; } #endif /* FRONTEND */ /* - * Helper function to ease writing of XLogRoutine->page_read callbacks. - * If this function is used, caller must supply a segment_open callback in - * 'state', as that is used here. + * Helper function to ease writing of routines that read raw WAL data. + * If this function is used, caller must supply a segment_open callback and + * segment_close callback as that is used here. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. @@ -1057,6 +1255,7 @@ err: */ bool WALRead(XLogReaderState *state, + WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo) { @@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state, XLogSegNo nextSegNo; if (state->seg.ws_file >= 0) - state->routine.segment_close(state); + segclosefn(state); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); - state->routine.segment_open(state, nextSegNo, &tli); + segopenfn(state, nextSegNo, &tli); /* This shouldn't happen -- indicates a bug in segment_open */ Assert(state->seg.ws_file >= 0); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index d17d660f46..e5de26dce5 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -686,8 +686,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = (state->seg.ws_segno * - state->segcxt.ws_segsize + state->segoff); + const XLogRecPtr lastReadPage = state->readPagePtr; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -702,7 +701,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * current TLI has since become historical. */ if (lastReadPage == wantPage && - state->readLen != 0 && + state->page_verified && lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1)) return; @@ -824,10 +823,12 @@ wal_segment_close(XLogReaderState *state) * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. */ -int -read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page) +bool +read_local_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *cur_page = state->readBuf; XLogRecPtr read_upto, loc; TimeLineID tli; @@ -926,7 +927,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + XLogReaderSetInputData(state, -1); + return false; } else { @@ -939,12 +941,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &errinfo)) + if (!WALRead(state, wal_segment_open, wal_segment_close, + cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ - return count; + state->readPagePtr = targetPagePtr; + XLogReaderSetInputData(state, count); + return true; } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f6803637b..4f6e87f18d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->page_read = page_read; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - xl_routine, prepare_write, do_write, + page_read, cleanup_cb, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -476,7 +479,8 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, - do_write, update_progress); + fast_forward, page_read, cleanup_cb, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); + while (XLogReadRecord(ctx->reader, &record, &err) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (err) elog(ERROR, "%s", err); if (!record) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 01d354829b..8f8c129620 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); @@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9d36879ed..7ab0b804e4 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -153,9 +153,8 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, NULL, NULL, NULL); /* @@ -512,9 +511,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), + read_local_xlog_page, + wal_segment_close, NULL, NULL, NULL); /* @@ -536,7 +534,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); + while (XLogReadRecord(ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (errm) elog(ERROR, "%s", errm); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4bf8a18e01..52fe9aba66 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd) /* create xlogreader for physical replication */ xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - NULL); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); if (!xlogreader) ereport(ERROR, @@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int -logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page) +static bool +logical_read_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; WALReadError errinfo; @@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + XLogReaderSetInputData(state, -1); + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(state, + if (!WALRead(state, WalSndSegmentOpen, wal_segment_close, cur_page, targetPagePtr, XLOG_BLCKSZ, @@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - return count; + XLogReaderSetInputData(state, count); + return true; } /* @@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); xlogreader = logical_decoding_ctx->reader; @@ -2745,7 +2746,7 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(xlogreader, + if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close, &output_message.data[output_message.len], startptr, nbytes, @@ -2843,7 +2844,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader)) + break; + } /* xlog record was invalid */ if (errm != NULL) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 59ebac7d6a..79f71c0477 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -41,15 +41,9 @@ static int xlogreadfd = -1; static XLogSegNo xlogreadsegno = -1; static char xlogfpath[MAXPGPATH]; -typedef struct XLogPageReadPrivate -{ - const char *restoreCommand; - int tliIndex; -} XLogPageReadPrivate; - -static int SimpleXLogPageRead(XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, + const char *datadir, int *tliIndex, + const char *restoreCommand); /* * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline @@ -66,20 +60,22 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; - private.tliIndex = tliIndex; - private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, - XL_ROUTINE(.page_read = &SimpleXLogPageRead), - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL); + if (xlogreader == NULL) pg_fatal("out of memory"); XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, + &tliIndex, restoreCommand)) + break; + } if (record == NULL) { @@ -123,19 +119,19 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; XLogRecPtr endptr; - private.tliIndex = tliIndex; - private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, - XL_ROUTINE(.page_read = &SimpleXLogPageRead), - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL); if (xlogreader == NULL) pg_fatal("out of memory"); XLogBeginRead(xlogreader, ptr); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex, restoreCommand)) + break; + } if (record == NULL) { if (errormsg) @@ -170,7 +166,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecPtr searchptr; XLogReaderState *xlogreader; char *errormsg; - XLogPageReadPrivate private; /* * The given fork pointer points to the end of the last common record, @@ -186,11 +181,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, forkptr += SizeOfXLogShortPHD; } - private.tliIndex = tliIndex; - private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, - XL_ROUTINE(.page_read = &SimpleXLogPageRead), - &private); + xlogreader = XLogReaderAllocate(WalSegSz, datadir, NULL); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -200,7 +191,13 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, uint8 info; XLogBeginRead(xlogreader, searchptr); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!SimpleXLogPageRead(xlogreader, datadir, + &tliIndex, restoreCommand)) + break; + } if (record == NULL) { @@ -246,16 +243,19 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, } /* XLogReader callback function, to read a WAL page */ -static int -SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf) +static bool +SimpleXLogPageRead(XLogReaderState *xlogreader, const char *datadir, + int *tliIndex, const char *restoreCommand) { - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; + XLogRecPtr targetPagePtr = xlogreader->readPagePtr; + char *readBuf = xlogreader->readBuf; uint32 targetPageOff; XLogRecPtr targetSegEnd; XLogSegNo targetSegNo; int r; + Assert(xlogreader->reqLen <= XLOG_BLCKSZ); + XLByteToSeg(targetPagePtr, targetSegNo, WalSegSz); XLogSegNoOffsetToRecPtr(targetSegNo + 1, 0, WalSegSz, targetSegEnd); targetPageOff = XLogSegmentOffset(targetPagePtr, WalSegSz); @@ -283,14 +283,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, * be done both forward and backward, consider also switching timeline * accordingly. */ - while (private->tliIndex < targetNentries - 1 && - targetHistory[private->tliIndex].end < targetSegEnd) - private->tliIndex++; - while (private->tliIndex > 0 && - targetHistory[private->tliIndex].begin >= targetSegEnd) - private->tliIndex--; + while (*tliIndex < targetNentries - 1 && + targetHistory[*tliIndex].end < targetSegEnd) + (*tliIndex)++; + while (*tliIndex > 0 && + targetHistory[*tliIndex].begin >= targetSegEnd) + (*tliIndex)--; - XLogFileName(xlogfname, targetHistory[private->tliIndex].tli, + XLogFileName(xlogfname, targetHistory[*tliIndex].tli, xlogreadsegno, WalSegSz); snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", @@ -303,10 +303,11 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, /* * If we have no restore_command to execute, then exit. */ - if (private->restoreCommand == NULL) + if (restoreCommand == NULL) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - return -1; + XLogReaderSetInputData(xlogreader, -1); + return false; } /* @@ -316,10 +317,13 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, xlogreadfd = RestoreArchivedFile(xlogreader->segcxt.ws_dir, xlogfname, WalSegSz, - private->restoreCommand); + restoreCommand); if (xlogreadfd < 0) - return -1; + { + XLogReaderSetInputData(xlogreader, -1); + return false; + } else pg_log_debug("using file \"%s\" restored from archive", xlogfpath); @@ -335,7 +339,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - return -1; + XLogReaderSetInputData(xlogreader, -1); + return false; } @@ -348,13 +353,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - return -1; + XLogReaderSetInputData(xlogreader, -1); + return false; } Assert(targetSegNo == xlogreadsegno); - xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; - return XLOG_BLCKSZ; + xlogreader->seg.ws_tli = targetHistory[*tliIndex].tli; + XLogReaderSetInputData(xlogreader, XLOG_BLCKSZ); + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index f8b8afe4a7..5db389aa2d 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -29,14 +29,6 @@ static const char *progname; static int WalSegSz; -typedef struct XLogDumpPrivate -{ - TimeLineID timeline; - XLogRecPtr startptr; - XLogRecPtr endptr; - bool endptr_reached; -} XLogDumpPrivate; - typedef struct XLogDumpConfig { /* display options */ @@ -330,30 +322,41 @@ WALDumpCloseSegment(XLogReaderState *state) state->seg.ws_file = -1; } -/* pg_waldump's XLogReaderRoutine->page_read callback */ -static int -WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff) +/* + * pg_waldump's WAL page rader + * + * timeline and startptr specifies the LSN, and reads up to endptr. + */ +static bool +WALDumpReadPage(XLogReaderState *state, TimeLineID timeline, + XLogRecPtr startptr, XLogRecPtr endptr) { - XLogDumpPrivate *private = state->private_data; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *readBuff = state->readBuf; int count = XLOG_BLCKSZ; WALReadError errinfo; - if (private->endptr != InvalidXLogRecPtr) + /* determine the number of bytes to read on the page */ + if (endptr != InvalidXLogRecPtr) { - if (targetPagePtr + XLOG_BLCKSZ <= private->endptr) + if (targetPagePtr + XLOG_BLCKSZ <= endptr) count = XLOG_BLCKSZ; - else if (targetPagePtr + reqLen <= private->endptr) - count = private->endptr - targetPagePtr; + else if (targetPagePtr + reqLen <= endptr) + count = endptr - targetPagePtr; else { - private->endptr_reached = true; - return -1; + /* Notify xlogreader that we didn't read at all */ + XLogReaderSetInputData(state, -1); + return false; } } - if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &errinfo)) + /* We should read more than requested by xlogreader */ + Assert(count >= state->readLen); + + if (!WALRead(state, WALDumpOpenSegment, WALDumpCloseSegment, + readBuff, targetPagePtr, count, timeline, &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; char fname[MAXPGPATH]; @@ -373,7 +376,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, (Size) errinfo.wre_req); } - return count; + /* Notify xlogreader of how many bytes we have read */ + XLogReaderSetInputData(state, count); + return true; } /* @@ -754,7 +759,10 @@ main(int argc, char **argv) uint32 xlogid; uint32 xrecoff; XLogReaderState *xlogreader_state; - XLogDumpPrivate private; + XLogFindNextRecordState *findnext_state; + TimeLineID timeline; + XLogRecPtr startptr; + XLogRecPtr endptr; XLogDumpConfig config; XLogDumpStats stats; XLogRecord *record; @@ -800,14 +808,12 @@ main(int argc, char **argv) } } - memset(&private, 0, sizeof(XLogDumpPrivate)); memset(&config, 0, sizeof(XLogDumpConfig)); memset(&stats, 0, sizeof(XLogDumpStats)); - private.timeline = 1; - private.startptr = InvalidXLogRecPtr; - private.endptr = InvalidXLogRecPtr; - private.endptr_reached = false; + timeline = 1; + startptr = InvalidXLogRecPtr; + endptr = InvalidXLogRecPtr; config.quiet = false; config.bkp_details = false; @@ -841,7 +847,7 @@ main(int argc, char **argv) optarg); goto bad_argument; } - private.endptr = (uint64) xlogid << 32 | xrecoff; + endptr = (uint64) xlogid << 32 | xrecoff; break; case 'f': config.follow = true; @@ -894,10 +900,10 @@ main(int argc, char **argv) goto bad_argument; } else - private.startptr = (uint64) xlogid << 32 | xrecoff; + startptr = (uint64) xlogid << 32 | xrecoff; break; case 't': - if (sscanf(optarg, "%d", &private.timeline) != 1) + if (sscanf(optarg, "%d", &timeline) != 1) { pg_log_error("could not parse timeline \"%s\"", optarg); goto bad_argument; @@ -974,21 +980,21 @@ main(int argc, char **argv) close(fd); /* parse position from file */ - XLogFromFileName(fname, &private.timeline, &segno, WalSegSz); + XLogFromFileName(fname, &timeline, &segno, WalSegSz); - if (XLogRecPtrIsInvalid(private.startptr)) - XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, private.startptr); - else if (!XLByteInSeg(private.startptr, segno, WalSegSz)) + if (XLogRecPtrIsInvalid(startptr)) + XLogSegNoOffsetToRecPtr(segno, 0, WalSegSz, startptr); + else if (!XLByteInSeg(startptr, segno, WalSegSz)) { pg_log_error("start WAL location %X/%X is not inside file \"%s\"", - LSN_FORMAT_ARGS(private.startptr), + LSN_FORMAT_ARGS(startptr), fname); goto bad_argument; } /* no second file specified, set end position */ - if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(private.endptr)) - XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, private.endptr); + if (!(optind + 1 < argc) && XLogRecPtrIsInvalid(endptr)) + XLogSegNoOffsetToRecPtr(segno + 1, 0, WalSegSz, endptr); /* parse ENDSEG if passed */ if (optind + 1 < argc) @@ -1004,26 +1010,26 @@ main(int argc, char **argv) close(fd); /* parse position from file */ - XLogFromFileName(fname, &private.timeline, &endsegno, WalSegSz); + XLogFromFileName(fname, &timeline, &endsegno, WalSegSz); if (endsegno < segno) fatal_error("ENDSEG %s is before STARTSEG %s", argv[optind + 1], argv[optind]); - if (XLogRecPtrIsInvalid(private.endptr)) + if (XLogRecPtrIsInvalid(endptr)) XLogSegNoOffsetToRecPtr(endsegno + 1, 0, WalSegSz, - private.endptr); + endptr); /* set segno to endsegno for check of --end */ segno = endsegno; } - if (!XLByteInSeg(private.endptr, segno, WalSegSz) && - private.endptr != (segno + 1) * WalSegSz) + if (!XLByteInSeg(endptr, segno, WalSegSz) && + endptr != (segno + 1) * WalSegSz) { pg_log_error("end WAL location %X/%X is not inside file \"%s\"", - LSN_FORMAT_ARGS(private.endptr), + LSN_FORMAT_ARGS(endptr), argv[argc - 1]); goto bad_argument; } @@ -1032,7 +1038,7 @@ main(int argc, char **argv) waldir = identify_target_directory(waldir, NULL); /* we don't know what to print */ - if (XLogRecPtrIsInvalid(private.startptr)) + if (XLogRecPtrIsInvalid(startptr)) { pg_log_error("no start WAL location given"); goto bad_argument; @@ -1042,42 +1048,56 @@ main(int argc, char **argv) /* we have everything we need, start reading */ xlogreader_state = - XLogReaderAllocate(WalSegSz, waldir, - XL_ROUTINE(.page_read = WALDumpReadPage, - .segment_open = WALDumpOpenSegment, - .segment_close = WALDumpCloseSegment), - &private); + XLogReaderAllocate(WalSegSz, waldir, WALDumpCloseSegment); + if (!xlogreader_state) fatal_error("out of memory"); - /* first find a valid recptr to start from */ - first_record = XLogFindNextRecord(xlogreader_state, private.startptr); + findnext_state = + InitXLogFindNextRecord(xlogreader_state, startptr); + if (!findnext_state) + fatal_error("out of memory"); + + /* first find a valid recptr to start from */ + while (XLogFindNextRecord(findnext_state)) + { + if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr)) + break; + } + + first_record = findnext_state->currRecPtr; if (first_record == InvalidXLogRecPtr) fatal_error("could not find a valid record after %X/%X", - LSN_FORMAT_ARGS(private.startptr)); + LSN_FORMAT_ARGS(startptr)); /* * Display a message that we're skipping data if `from` wasn't a pointer * to the start of a record and also wasn't a pointer to the beginning of * a segment (e.g. we were used in file mode). */ - if (first_record != private.startptr && - XLogSegmentOffset(private.startptr, WalSegSz) != 0) + if (first_record != startptr && + XLogSegmentOffset(startptr, WalSegSz) != 0) printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n", "first record is after %X/%X, at %X/%X, skipping over %u bytes\n", - (first_record - private.startptr)), - LSN_FORMAT_ARGS(private.startptr), + (first_record - startptr)), + LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(first_record), - (uint32) (first_record - private.startptr)); + (uint32) (first_record - startptr)); for (;;) { /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, &errormsg); + while (XLogReadRecord(xlogreader_state, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!WALDumpReadPage(xlogreader_state, timeline, startptr, endptr)) + break; + } + if (!record) { - if (!config.follow || private.endptr_reached) + if (!config.follow) break; else { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 21d200d3df..d27c0cd281 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -56,65 +56,17 @@ typedef struct WALSegmentContext } WALSegmentContext; typedef struct XLogReaderState XLogReaderState; +typedef struct XLogFindNextRecordState XLogFindNextRecordState; -/* Function type definitions for various xlogreader interactions */ -typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, - XLogRecPtr targetRecPtr, - char *readBuf); +/* Function type definition for the segment cleanup callback */ +typedef void (*WALSegmentCleanupCB) (XLogReaderState *xlogreader); + +/* Function type definition for the open/close callbacks for WALRead() */ typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, XLogSegNo nextSegNo, TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); -typedef struct XLogReaderRoutine -{ - /* - * Data input callback - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set ->seg.ws_tli to the TLI of the file the page was - * read from. - */ - XLogPageReadCB page_read; - - /* - * Callback to open the specified WAL segment for reading. ->seg.ws_file - * shall be set to the file descriptor of the opened segment. In case of - * failure, an error shall be raised by the callback and it shall not - * return. - * - * "nextSegNo" is the number of the segment to be opened. - * - * "tli_p" is an input/output argument. WALRead() uses it to pass the - * timeline in which the new segment should be found, but the callback can - * use it to return the TLI that it actually opened. - */ - WALSegmentOpenCB segment_open; - - /* - * WAL segment close callback. ->seg.ws_file shall be set to a negative - * number. - */ - WALSegmentCloseCB segment_close; -} XLogReaderRoutine; - -#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__} - typedef struct { /* Is this block ref in use? */ @@ -144,12 +96,36 @@ typedef struct uint16 data_bufsz; } DecodedBkpBlock; +/* Return code from XLogReadRecord */ +typedef enum XLogReadRecordResult +{ + XLREAD_SUCCESS, /* record is successfully read */ + XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ + XLREAD_FAIL /* failed during reading a record */ +} XLogReadRecordResult; + +/* + * internal state of XLogReadRecord + * + * XLogReadState runs a state machine while reading a record. Theses states + * are not seen outside the function. Each state may repeat several times + * exiting requesting caller for new data. See the comment of XLogReadRecrod + * for details. + */ +typedef enum XLogReadRecordState +{ + XLREAD_NEXT_RECORD, + XLREAD_TOT_LEN, + XLREAD_FIRST_FRAGMENT, + XLREAD_CONTINUATION +} XLogReadRecordState; + struct XLogReaderState { /* * Operational callbacks */ - XLogReaderRoutine routine; + WALSegmentCleanupCB cleanup_cb; /* ---------------------------------------- * Public parameters @@ -162,19 +138,31 @@ struct XLogReaderState */ uint64 system_identifier; - /* - * Opaque data for callbacks to use. Not used by XLogReader. - */ - void *private_data; - /* * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. */ - XLogRecPtr ReadRecPtr; /* start of last record read */ + XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + XLogRecPtr PrevRecPtr; /* start of previous record read */ + /* ---------------------------------------- + * Communication with page reader + * readBuf is XLOG_BLCKSZ bytes, valid up to at least reqLen bytes. + * ---------------------------------------- + */ + /* variables the clients of xlogreader can examine */ + XLogRecPtr readPagePtr; /* page pointer to read */ + int32 reqLen; /* bytes requested to the caller */ + char *readBuf; /* buffer to store data */ + bool page_verified; /* is the page header on the buffer verified? */ + bool record_verified;/* is the current record header verified? */ + + /* variables set by the client of xlogreader */ + int32 readLen; /* actual bytes copied into readBuf by client, + * which should be >= reqLen. Client should + * use XLogReaderSetInputData() to set. */ /* ---------------------------------------- * Decoded representation of current record @@ -203,13 +191,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least - * readLen bytes) - */ - char *readBuf; - uint32 readLen; - /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; @@ -222,8 +203,6 @@ struct XLogReaderState XLogRecPtr latestPagePtr; TimeLineID latestPageTLI; - /* beginning of the WAL record being read. */ - XLogRecPtr currRecPtr; /* timeline to read it from, 0 if a lookup is required */ TimeLineID currTLI; @@ -250,16 +229,37 @@ struct XLogReaderState char *readRecordBuf; uint32 readRecordBufSize; + /* + * XLogReadRecord() state + */ + XLogReadRecordState readRecordState; /* state machine state */ + int recordGotLen; /* amount of current record that has already + * been read */ + int recordRemainLen; /* length of current record that remains */ + XLogRecPtr recordContRecPtr; /* where the current record continues */ + /* Buffer to hold error message */ char *errormsg_buf; }; +struct XLogFindNextRecordState +{ + XLogReaderState *reader_state; + XLogRecPtr targetRecPtr; + XLogRecPtr currRecPtr; +}; + +/* Report that data is available for decoding. */ +static inline void +XLogReaderSetInputData(XLogReaderState *state, int32 len) +{ + state->readLen = len; +} + /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogReaderRoutine *routine, - void *private_data); -extern XLogReaderRoutine *LocalXLogReaderRoutine(void); + WALSegmentCleanupCB cleanup_cb); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); @@ -267,12 +267,14 @@ extern void XLogReaderFree(XLogReaderState *state); /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND -extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); +extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr); +extern bool XLogFindNextRecord(XLogFindNextRecordState *state); #endif /* FRONTEND */ /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ -extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); +extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, + XLogRecord **record, + char **errormsg); /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, @@ -292,6 +294,7 @@ typedef struct WALReadError } WALReadError; extern bool WALRead(XLogReaderState *state, + WALSegmentOpenCB segopenfn, WALSegmentCloseCB sgclosefn, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo); diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 9ac602b674..397fb27fc2 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,9 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern int read_local_xlog_page(XLogReaderState *state, - XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page); +extern bool read_local_xlog_page(XLogReaderState *state); extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index af551d6f4e..94e278ef81 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -29,6 +29,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC TransactionId xid ); +typedef struct LogicalDecodingContext LogicalDecodingContext; + +typedef bool (*LogicalDecodingXLogPageReadCB)(XLogReaderState *ctx); + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -39,6 +43,7 @@ typedef struct LogicalDecodingContext /* infrastructure pieces for decoding */ XLogReaderState *reader; + LogicalDecodingXLogPageReadCB page_read; struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; @@ -105,14 +110,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress);