diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index dcf747c633..f2da505892 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -866,46 +866,83 @@ XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) { XLogReaderState saved_state = *state; - XLogRecPtr targetPagePtr; XLogRecPtr tmpRecPtr; - int targetRecOff; XLogRecPtr found = InvalidXLogRecPtr; - uint32 pageHeaderSize; XLogPageHeader header; - int readLen; char *errormsg; Assert(!XLogRecPtrIsInvalid(RecPtr)); - targetRecOff = RecPtr % XLOG_BLCKSZ; - - /* scroll back to page boundary */ - targetPagePtr = RecPtr - targetRecOff; - - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) - goto err; - - header = (XLogPageHeader) state->readBuf; - - pageHeaderSize = XLogPageHeaderSize(header); - - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; - - /* skip over potential continuation data */ - if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) + /* + * skip over potential continuation data, keeping in mind that it may span + * multiple pages + */ + tmpRecPtr = RecPtr; + while (true) { - /* record headers are MAXALIGN'ed */ - tmpRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(header->xlp_rem_len); - } - else - { - tmpRecPtr = targetPagePtr + pageHeaderSize; + XLogRecPtr targetPagePtr; + int targetRecOff; + uint32 pageHeaderSize; + int readLen; + + /* + * Compute targetRecOff. It should typically be equal or greater than + * short page-header since a valid record can't start anywhere before + * that, except when caller has explicitly specified the offset that + * falls somewhere there or when we are skipping multi-page + * continuation record. It doesn't matter though because + * ReadPageInternal() is prepared to handle that and will read at least + * short page-header worth of data + */ + targetRecOff = tmpRecPtr % XLOG_BLCKSZ; + + /* scroll back to page boundary */ + targetPagePtr = tmpRecPtr - targetRecOff; + + /* Read the page containing the record */ + readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); + if (readLen < 0) + goto err; + + header = (XLogPageHeader) state->readBuf; + + pageHeaderSize = XLogPageHeaderSize(header); + + /* make sure we have enough data for the page header */ + readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); + if (readLen < 0) + goto err; + + /* skip over potential continuation data */ + if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) + { + /* + * If the length of the remaining continuation data is more than + * what can fit in this page, the continuation record crosses over + * this page. Read the next page and try again. xlp_rem_len in the + * next page header will contain the remaining length of the + * continuation data + * + * Note that record headers are MAXALIGN'ed + */ + if (MAXALIGN(header->xlp_rem_len) > (XLOG_BLCKSZ - pageHeaderSize)) + tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; + else + { + /* + * The previous continuation record ends in this page. Set + * tmpRecPtr to point to the first valid record + */ + tmpRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(header->xlp_rem_len); + break; + } + } + else + { + tmpRecPtr = targetPagePtr + pageHeaderSize; + break; + } } /*