diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 700cfd85c0..eb6cfc5c44 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \ timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \ - xlogutils.o + xlogreader.o xlogutils.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 51a515a555..70cfabc236 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -30,6 +30,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -548,7 +549,6 @@ static int readFile = -1; static XLogSegNo readSegNo = 0; static uint32 readOff = 0; static uint32 readLen = 0; -static bool readFileHeaderValidated = false; static XLogSource readSource = 0; /* XLOG_FROM_* code */ /* @@ -561,6 +561,13 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; +} XLogPageReadPrivate; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -572,18 +579,9 @@ static bool lastSourceFailed = false; static TimestampTz XLogReceiptTime = 0; static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */ -/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ -static char *readBuf = NULL; - -/* Buffer for current ReadRecord result (expandable) */ -static char *readRecordBuf = NULL; -static uint32 readRecordBufSize = 0; - /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */ -static TimeLineID lastPageTLI = 0; -static TimeLineID lastSegmentTLI = 0; static XLogRecPtr minRecoveryPoint; /* local copy of * ControlFile->minRecoveryPoint */ @@ -627,8 +625,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notexistOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); -static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, - bool randAccess); +static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, + int reqLen, char *readBuf, TimeLineID *readTLI); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -639,12 +637,11 @@ static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); +static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, + int emode, bool fetching_ckpt); static void CheckRecoveryConsistency(void); -static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly); -static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, - int emode, bool randAccess); -static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); +static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, + XLogRecPtr RecPtr, int whichChkpt); static bool rescanLatestTimeLine(void); static void WriteControlFile(void); static void ReadControlFile(void); @@ -2652,9 +2649,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, if (source != XLOG_FROM_STREAM) XLogReceiptTime = GetCurrentTimestamp(); - /* The file header needs to be validated on first access */ - readFileHeaderValidated = false; - return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ @@ -2709,7 +2703,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE) { - fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true); + fd = XLogFileRead(segno, emode, tli, + XLOG_FROM_ARCHIVE, true); if (fd != -1) { elog(DEBUG1, "got WAL segment from archive"); @@ -2721,7 +2716,8 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG) { - fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true); + fd = XLogFileRead(segno, emode, tli, + XLOG_FROM_PG_XLOG, true); if (fd != -1) { if (!expectedTLEs) @@ -3177,102 +3173,6 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, return InvalidBuffer; /* keep compiler quiet */ } -/* - * CRC-check an XLOG record. We do not believe the contents of an XLOG - * record (other than to the minimal extent of computing the amount of - * data to read in) until we've checked the CRCs. - * - * We assume all of the record (that is, xl_tot_len bytes) has been read - * into memory at *record. Also, ValidXLogRecordHeader() has accepted the - * record's header, which means in particular that xl_tot_len is at least - * SizeOfXlogRecord, so it is safe to fetch xl_len. - */ -static bool -RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) -{ - pg_crc32 crc; - int i; - uint32 len = record->xl_len; - BkpBlock bkpb; - char *blk; - size_t remaining = record->xl_tot_len; - - /* First the rmgr data */ - if (remaining < SizeOfXLogRecord + len) - { - /* ValidXLogRecordHeader() should've caught this already... */ - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid record length at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - remaining -= SizeOfXLogRecord + len; - INIT_CRC32(crc); - COMP_CRC32(crc, XLogRecGetData(record), len); - - /* Add in the backup blocks, if any */ - blk = (char *) XLogRecGetData(record) + len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - uint32 blen; - - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - if (remaining < sizeof(BkpBlock)) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - memcpy(&bkpb, blk, sizeof(BkpBlock)); - - if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect hole size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; - - if (remaining < blen) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - remaining -= blen; - COMP_CRC32(crc, blk, blen); - blk += blen; - } - - /* Check that xl_tot_len agrees with our calculation */ - if (remaining != 0) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect total length in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - - /* Finally include the record header */ - COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc)); - FIN_CRC32(crc); - - if (!EQ_CRC32(record->xl_crc, crc)) - { - ereport(emode_for_corrupt_record(emode, recptr), - (errmsg("incorrect resource manager data checksum in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr))); - return false; - } - - return true; -} - /* * Attempt to read an XLOG record. * @@ -3286,511 +3186,68 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) +ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, + bool fetching_ckpt) { XLogRecord *record; - XLogRecPtr tmpRecPtr = EndRecPtr; - bool randAccess = false; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; + XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - if (readBuf == NULL) - { - /* - * First time through, permanently allocate readBuf. We do it this - * way, rather than just making a static array, for two reasons: (1) - * no need to waste the storage in most instantiations of the backend; - * (2) a static char array isn't guaranteed to have any particular - * alignment, whereas malloc() will provide MAXALIGN'd storage. - */ - readBuf = (char *) malloc(XLOG_BLCKSZ); - Assert(readBuf != NULL); - } - - if (RecPtr == NULL) - { - RecPtr = &tmpRecPtr; - - /* - * RecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that - * until we've read in the page, since the header size is variable. - */ - } - else - { - /* - * In this case, the passed-in record pointer should already be - * pointing to a valid record starting position. - */ - if (!XRecOffIsValid(*RecPtr)) - ereport(PANIC, - (errmsg("invalid record offset at %X/%X", - (uint32) (*RecPtr >> 32), (uint32) *RecPtr))); - - /* - * Since we are going to a random position in WAL, forget any prior - * state about what timeline we were in, and allow it to be any - * timeline in expectedTLEs. We also set a flag to allow curFileTLI - * to go backwards (but we can't reset that variable right here, since - * we might not change files at all). - */ - /* see comment in ValidXLogPageHeader */ - lastPageTLI = lastSegmentTLI = 0; - randAccess = true; /* allow curFileTLI to go backwards too */ - } + /* Pass through parameters to XLogPageRead */ + private->fetching_ckpt = fetching_ckpt; + private->emode = emode; + private->randAccess = (RecPtr != InvalidXLogRecPtr); /* This is the first try to read this page. */ lastSourceFailed = false; -retry: - /* Read the page containing the record */ - if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) - return NULL; - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - targetRecOff = (*RecPtr) % XLOG_BLCKSZ; - if (targetRecOff == 0) + do { - /* - * At page start, so skip over page header. The Assert checks that - * we're not scribbling on caller's record pointer; it's OK because we - * can only get here in the continuing-from-prev-record case, since - * XRecOffIsValid rejected the zero-page-offset case otherwise. - */ - Assert(RecPtr == &tmpRecPtr); - (*RecPtr) += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record offset at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } - if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("contrecord is requested by %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } + char *errormsg; - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of - * the struct, so it must be on this page (the records are MAXALIGNed), - * but we cannot access any other fields until we've verified that we - * got the whole header. - */ - record = (XLogRecord *) (readBuf + (*RecPtr) % XLOG_BLCKSZ); - total_len = record->xl_tot_len; - - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) - goto next_record_is_invalid; - gotheader = true; - } - else - { - if (total_len < SizeOfXLogRecord) + record = XLogReadRecord(xlogreader, RecPtr, &errormsg); + ReadRecPtr = xlogreader->ReadRecPtr; + EndRecPtr = xlogreader->EndRecPtr; + if (record == NULL) { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; + /* not all failures fill errormsg; report those that do */ + if (errormsg && errormsg[0] != '\0') + ereport(emode_for_corrupt_record(emode, + RecPtr ? RecPtr : EndRecPtr), + (errmsg_internal("%s", errormsg) /* already translated */)); + + lastSourceFailed = true; + + if (readFile >= 0) + { + close(readFile); + readFile = -1; + } + break; } - gotheader = false; - } - - /* - * Allocate or enlarge readRecordBuf as needed. To avoid useless small - * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure - * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with. (That is - * enough for all "normal" records, but very large commit or abort records - * might need more space.) - */ - if (total_len > readRecordBufSize) - { - uint32 newSize = total_len; - - newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); - newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ)); - if (readRecordBuf) - free(readRecordBuf); - readRecordBuf = (char *) malloc(newSize); - if (!readRecordBuf) - { - readRecordBufSize = 0; - /* We treat this as a "bogus data" condition */ - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record length %u at %X/%X too long", - total_len, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - goto next_record_is_invalid; - } - readRecordBufSize = newSize; - } - - len = XLOG_BLCKSZ - (*RecPtr) % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contrecord; - XLogPageHeader pageHeader; - XLogRecPtr pagelsn; - char *buffer; - uint32 gotlen; - - /* Initialize pagelsn to the beginning of the page this record is on */ - pagelsn = ((*RecPtr) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - - /* Copy the first fragment of the record from the first page. */ - memcpy(readRecordBuf, readBuf + (*RecPtr) % XLOG_BLCKSZ, len); - buffer = readRecordBuf + len; - gotlen = len; - - do - { - /* Calculate pointer to beginning of next page */ - pagelsn += XLOG_BLCKSZ; - /* Wait for the next page to become available */ - if (!XLogPageRead(&pagelsn, emode, false, false)) - return NULL; - - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("there is no contrecord flag in log segment %s, offset %u", - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - goto next_record_is_invalid; - } - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid contrecord length %u in log segment %s, offset %u", - pageHeader->xlp_rem_len, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - goto next_record_is_invalid; - } - - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); - contrecord = (char *) readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; - memcpy(buffer, (char *) contrecord, len); - buffer += len; - gotlen += len; - - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) readRecordBuf; - if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess)) - goto next_record_is_invalid; - gotheader = true; - } - } while (pageHeader->xlp_rem_len > len); - - record = (XLogRecord *) readRecordBuf; - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); - XLogSegNoOffsetToRecPtr( - readSegNo, - readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len), - EndRecPtr); - ReadRecPtr = *RecPtr; - } - else - { - /* Record does not cross a page boundary */ - if (!RecordIsValid(record, *RecPtr, emode)) - goto next_record_is_invalid; - EndRecPtr = *RecPtr + MAXALIGN(total_len); - - ReadRecPtr = *RecPtr; - memcpy(readRecordBuf, record, total_len); - } - - /* - * Special processing if it's an XLOG SWITCH record - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - /* Pretend it extends to end of segment */ - EndRecPtr += XLogSegSize - 1; - EndRecPtr -= EndRecPtr % XLogSegSize; /* - * Pretend that readBuf contains the last page of the segment. This is - * just to avoid Assert failure in StartupXLOG if XLOG ends with this - * segment. + * Check page TLI is one of the expected values. */ - readOff = XLogSegSize - XLOG_BLCKSZ; - } + if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs)) + { + char fname[MAXFNAMELEN]; + XLogSegNo segno; + int32 offset; + + XLByteToSeg(xlogreader->latestPagePtr, segno); + offset = xlogreader->latestPagePtr % XLogSegSize; + XLogFileName(fname, xlogreader->readPageTLI, segno); + ereport(emode_for_corrupt_record(emode, + RecPtr ? RecPtr : EndRecPtr), + (errmsg("unexpected timeline ID %u in log segment %s, offset %u", + xlogreader->latestPageTLI, + fname, + offset))); + return false; + } + } while (StandbyMode && record == NULL); + return record; - -next_record_is_invalid: - lastSourceFailed = true; - - if (readFile >= 0) - { - close(readFile); - readFile = -1; - } - - /* In standby-mode, keep trying */ - if (StandbyMode) - goto retry; - else - return NULL; -} - -/* - * Check whether the xlog header of a page just read in looks valid. - * - * This is just a convenience subroutine to avoid duplicated code in - * ReadRecord. It's not intended for use from anywhere else. - */ -static bool -ValidXLogPageHeader(XLogPageHeader hdr, int emode, bool segmentonly) -{ - XLogRecPtr recaddr; - - XLogSegNoOffsetToRecPtr(readSegNo, readOff, recaddr); - - if (hdr->xlp_magic != XLOG_PAGE_MAGIC) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid magic number %04X in log segment %s, offset %u", - hdr->xlp_magic, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid info bits %04X in log segment %s, offset %u", - hdr->xlp_info, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - if (hdr->xlp_info & XLP_LONG_HEADER) - { - XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; - - if (longhdr->xlp_sysid != ControlFile->system_identifier) - { - char fhdrident_str[32]; - char sysident_str[32]; - - /* - * Format sysids separately to keep platform-dependent format code - * out of the translatable message string. - */ - snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, - longhdr->xlp_sysid); - snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, - ControlFile->system_identifier); - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("WAL file database system identifier is %s, pg_control database system identifier is %s.", - fhdrident_str, sysident_str))); - return false; - } - if (longhdr->xlp_seg_size != XLogSegSize) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("Incorrect XLOG_SEG_SIZE in page header."))); - return false; - } - if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("WAL file is from different database system"), - errdetail("Incorrect XLOG_BLCKSZ in page header."))); - return false; - } - } - else if (readOff == 0) - { - /* hmm, first page of file doesn't have a long header? */ - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("invalid info bits %04X in log segment %s, offset %u", - hdr->xlp_info, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - - if (hdr->xlp_pageaddr != recaddr) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("unexpected pageaddr %X/%X in log segment %s, offset %u", - (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - - /* - * Check page TLI is one of the expected values. - */ - if (!tliInHistory(hdr->xlp_tli, expectedTLEs)) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("unexpected timeline ID %u in log segment %s, offset %u", - hdr->xlp_tli, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - - /* - * Since child timelines are always assigned a TLI greater than their - * immediate parent's TLI, we should never see TLI go backwards across - * successive pages of a consistent WAL sequence. - * - * Of course this check should only be applied when advancing sequentially - * across pages; therefore ReadRecord resets lastPageTLI and - * lastSegmentTLI to zero when going to a random page. - * - * Sometimes we re-open a segment that's already been partially replayed. - * In that case we cannot perform the normal TLI check: if there is a - * timeline switch within the segment, the first page has a smaller TLI - * than later pages following the timeline switch, and we might've read - * them already. As a weaker test, we still check that it's not smaller - * than the TLI we last saw at the beginning of a segment. Pass - * segmentonly = true when re-validating the first page like that, and the - * page you're actually interested in comes later. - */ - if (hdr->xlp_tli < (segmentonly ? lastSegmentTLI : lastPageTLI)) - { - ereport(emode_for_corrupt_record(emode, recaddr), - (errmsg("out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", - hdr->xlp_tli, - segmentonly ? lastSegmentTLI : lastPageTLI, - XLogFileNameP(curFileTLI, readSegNo), - readOff))); - return false; - } - lastPageTLI = hdr->xlp_tli; - if (readOff == 0) - lastSegmentTLI = hdr->xlp_tli; - - return true; -} - -/* - * Validate an XLOG record header. - * - * This is just a convenience subroutine to avoid duplicated code in - * ReadRecord. It's not intended for use from anywhere else. - */ -static bool -ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode, - bool randAccess) -{ - /* - * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is - * required. - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - if (record->xl_len != 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid xlog switch record at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - } - else if (record->xl_len == 0) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with zero length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || - record->xl_tot_len > SizeOfXLogRecord + record->xl_len + - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid record length at %X/%X", - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (record->xl_rmid > RM_MAX_ID) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("invalid resource manager ID %u at %X/%X", - record->xl_rmid, (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - if (randAccess) - { - /* - * We can't exactly verify the prev-link, but surely it should be less - * than the record's own address. - */ - if (!(record->xl_prev < *RecPtr)) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev, - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - } - else - { - /* - * Record's prev-link should exactly match our previous location. This - * check guards against torn WAL pages where a stale but valid-looking - * WAL record starts on a sector boundary. - */ - if (record->xl_prev != ReadRecPtr) - { - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errmsg("record with incorrect prev-link %X/%X at %X/%X", - (uint32) (record->xl_prev >> 32), (uint32) record->xl_prev, - (uint32) ((*RecPtr) >> 32), (uint32) *RecPtr))); - return false; - } - } - - return true; } /* @@ -5235,6 +4692,8 @@ StartupXLOG(void) bool backupEndRequired = false; bool backupFromStandby = false; DBState dbstate_at_startup; + XLogReaderState *xlogreader; + XLogPageReadPrivate private; /* * Read control file and check XLOG status looks valid. @@ -5351,6 +4810,16 @@ StartupXLOG(void) if (StandbyMode) OwnLatch(&XLogCtl->recoveryWakeupLatch); + /* Set up XLOG reader facility */ + MemSet(&private, 0, sizeof(XLogPageReadPrivate)); + xlogreader = XLogReaderAllocate(&XLogPageRead, &private); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor"))); + xlogreader->system_identifier = ControlFile->system_identifier; + if (read_backup_label(&checkPointLoc, &backupEndRequired, &backupFromStandby)) { @@ -5358,7 +4827,7 @@ StartupXLOG(void) * When a backup_label file is present, we want to roll forward from * the checkpoint it identifies, rather than using pg_control. */ - record = ReadCheckpointRecord(checkPointLoc, 0); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0); if (record != NULL) { memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); @@ -5376,7 +4845,7 @@ StartupXLOG(void) */ if (checkPoint.redo < checkPointLoc) { - if (!ReadRecord(&(checkPoint.redo), LOG, false)) + if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir))); @@ -5400,7 +4869,7 @@ StartupXLOG(void) */ checkPointLoc = ControlFile->checkPoint; RedoStartLSN = ControlFile->checkPointCopy.redo; - record = ReadCheckpointRecord(checkPointLoc, 1); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1); if (record != NULL) { ereport(DEBUG1, @@ -5419,7 +4888,7 @@ StartupXLOG(void) else { checkPointLoc = ControlFile->prevCheckPoint; - record = ReadCheckpointRecord(checkPointLoc, 2); + record = ReadCheckpointRecord(xlogreader, checkPointLoc, 2); if (record != NULL) { ereport(LOG, @@ -5777,12 +5246,12 @@ StartupXLOG(void) if (checkPoint.redo < RecPtr) { /* back up to find the record */ - record = ReadRecord(&(checkPoint.redo), PANIC, false); + record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(NULL, LOG, false); + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } if (record != NULL) @@ -5963,7 +5432,7 @@ StartupXLOG(void) break; /* Else, try to fetch the next WAL record */ - record = ReadRecord(NULL, LOG, false); + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); /* @@ -6013,7 +5482,7 @@ StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(&LastRec, PANIC, false); + record = ReadRecord(xlogreader, LastRec, PANIC, false); EndOfLog = EndRecPtr; XLByteToPrevSeg(EndOfLog, endLogSegNo); @@ -6117,7 +5586,7 @@ StartupXLOG(void) * we will use that below.) */ if (InArchiveRecovery) - exitArchiveRecovery(curFileTLI, endLogSegNo); + exitArchiveRecovery(xlogreader->readPageTLI, endLogSegNo); /* * Prepare to write WAL starting at EndOfLog position, and init xlog @@ -6136,8 +5605,15 @@ StartupXLOG(void) * record spans, not the one it starts in. The last block is indeed the * one we want to use. */ - Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); - memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ); + if (EndOfLog % XLOG_BLCKSZ == 0) + { + memset(Insert->currpage, 0, XLOG_BLCKSZ); + } + else + { + Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); + memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ); + } Insert->currpos = (char *) Insert->currpage + (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]); @@ -6288,23 +5764,13 @@ StartupXLOG(void) if (standbyState != STANDBY_DISABLED) ShutdownRecoveryTransactionEnvironment(); - /* Shut down readFile facility, free space */ + /* Shut down xlogreader */ if (readFile >= 0) { close(readFile); readFile = -1; } - if (readBuf) - { - free(readBuf); - readBuf = NULL; - } - if (readRecordBuf) - { - free(readRecordBuf); - readRecordBuf = NULL; - readRecordBufSize = 0; - } + XLogReaderFree(xlogreader); /* * If any of the critical GUCs have changed, log them before we allow @@ -6554,7 +6020,8 @@ LocalSetXLogInsertAllowed(void) * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label) */ static XLogRecord * -ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) +ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, + int whichChkpt) { XLogRecord *record; @@ -6578,7 +6045,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = ReadRecord(&RecPtr, LOG, true); + record = ReadRecord(xlogreader, RecPtr, LOG, true); if (record == NULL) { @@ -9313,7 +8780,9 @@ CancelBackup(void) /* * Read the XLOG page containing RecPtr into readBuf (if not read already). - * Returns true if the page is read successfully. + * Returns number of bytes read, if the page is read successfully, or -1 + * in case of errors. When errors occur, they are ereport'ed, but only + * if they have not been previously reported. * * This is responsible for restoring files from archive as needed, as well * as for waiting for the requested WAL record to arrive in standby mode. @@ -9332,28 +8801,24 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static bool -XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, - bool randAccess) +static int +XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + char *readBuf, TimeLineID *readTLI) { + XLogPageReadPrivate *private = + (XLogPageReadPrivate *) xlogreader->private_data; + int emode = private->emode; uint32 targetPageOff; - uint32 targetRecOff; - XLogSegNo targetSegNo; + XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; - XLByteToSeg(*RecPtr, targetSegNo); - targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - targetRecOff = (*RecPtr) % XLOG_BLCKSZ; - - /* Fast exit if we have read the record in the current buffer already */ - if (!lastSourceFailed && targetSegNo == readSegNo && - targetPageOff == readOff && targetRecOff < readLen) - return true; + XLByteToSeg(targetPagePtr, targetSegNo); + targetPageOff = targetPagePtr % XLogSegSize; /* * See if we need to switch to a new segment because the requested record * is not in the currently open one. */ - if (readFile >= 0 && !XLByteInSeg(*RecPtr, readSegNo)) + if (readFile >= 0 && !XLByteInSeg(targetPagePtr, readSegNo)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -9374,39 +8839,34 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, readSource = 0; } - XLByteToSeg(*RecPtr, readSegNo); + XLByteToSeg(targetPagePtr, readSegNo); retry: /* See if we need to retrieve more data */ if (readFile < 0 || - (readSource == XLOG_FROM_STREAM && receivedUpto <= *RecPtr)) + (readSource == XLOG_FROM_STREAM && + receivedUpto <= targetPagePtr + reqLen)) { if (StandbyMode) { - if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess, - fetching_ckpt)) + if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, + private->randAccess, + private->fetching_ckpt)) goto triggered; } - else + /* In archive or crash recovery. */ + else if (readFile < 0) { - /* In archive or crash recovery. */ + int source; + + if (InArchiveRecovery) + source = XLOG_FROM_ANY; + else + source = XLOG_FROM_PG_XLOG; + + readFile = XLogFileReadAnyTLI(readSegNo, emode, source); if (readFile < 0) - { - int source; - - /* Reset curFileTLI if random fetch. */ - if (randAccess) - curFileTLI = 0; - - if (InArchiveRecovery) - source = XLOG_FROM_ANY; - else - source = XLOG_FROM_PG_XLOG; - - readFile = XLogFileReadAnyTLI(readSegNo, emode, source); - if (readFile < 0) - return false; - } + return -1; } } @@ -9424,72 +8884,46 @@ retry: */ if (readSource == XLOG_FROM_STREAM) { - if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) - { + if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; - } else readLen = receivedUpto % XLogSegSize - targetPageOff; } else readLen = XLOG_BLCKSZ; - if (!readFileHeaderValidated && targetPageOff != 0) - { - /* - * Whenever switching to a new WAL segment, we read the first page of - * the file and validate its header, even if that's not where the - * target record is. This is so that we can check the additional - * identification info that is present in the first page's "long" - * header. - */ - readOff = 0; - if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) - { - char fname[MAXFNAMELEN]; - XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); - goto next_record_is_invalid; - } - if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, true)) - goto next_record_is_invalid; - } - /* Read the requested page */ readOff = targetPageOff; if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) { char fname[MAXFNAMELEN]; + XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - fname, readOff))); + fname, readOff))); goto next_record_is_invalid; } + if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; + XLogFileName(fname, curFileTLI, readSegNo); - ereport(emode_for_corrupt_record(emode, *RecPtr), + ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, readOff))); goto next_record_is_invalid; } - if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode, false)) - goto next_record_is_invalid; - - readFileHeaderValidated = true; Assert(targetSegNo == readSegNo); Assert(targetPageOff == readOff); - Assert(targetRecOff < readLen); + Assert(reqLen <= readLen); - return true; + *readTLI = curFileTLI; + return readLen; next_record_is_invalid: lastSourceFailed = true; @@ -9504,7 +8938,7 @@ next_record_is_invalid: if (StandbyMode) goto retry; else - return false; + return -1; triggered: if (readFile >= 0) @@ -9513,7 +8947,7 @@ triggered: readLen = 0; readSource = 0; - return false; + return -1; } /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c new file mode 100644 index 0000000000..ff871a3412 --- /dev/null +++ b/src/backend/access/transam/xlogreader.c @@ -0,0 +1,1005 @@ +/*------------------------------------------------------------------------- + * + * xlogreader.c + * Generic XLog reading facility + * + * Portions Copyright (c) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogreader.c + * + * NOTES + * See xlogreader.h for more notes on this facility. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogreader.h" +#include "catalog/pg_control.h" + +static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); + +static bool ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr, + XLogPageHeader hdr); +static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, + XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); +static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, + XLogRecPtr recptr); +static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen); +static void +report_invalid_record(XLogReaderState *state, const char *fmt,...) +/* This extension allows gcc to check the format string for consistency with + the supplied arguments. */ +__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); + +/* size of the buffer allocated for error message. */ +#define MAX_ERRORMSG_LEN 1000 + +/* + * Construct a string in state->errormsg_buf explaining what's wrong with + * the current record being read. + */ +static void +report_invalid_record(XLogReaderState *state, const char *fmt,...) +{ + va_list args; + + fmt = _(fmt); + + va_start(args, fmt); + vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); + va_end(args); +} + +/* + * Allocate and initialize a new XLogReader. + * + * Returns NULL if the xlogreader couldn't be allocated. + */ +XLogReaderState * +XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) +{ + XLogReaderState *state; + + AssertArg(pagereadfunc != NULL); + + state = (XLogReaderState *) malloc(sizeof(XLogReaderState)); + if (!state) + return NULL; + MemSet(state, 0, sizeof(XLogReaderState)); + + /* + * Permanently allocate readBuf. We do it this way, rather than just + * making a static array, for two reasons: (1) no need to waste the + * storage in most instantiations of the backend; (2) a static char array + * isn't guaranteed to have any particular alignment, whereas malloc() + * will provide MAXALIGN'd storage. + */ + state->readBuf = (char *) malloc(XLOG_BLCKSZ); + if (!state->readBuf) + { + free(state); + return NULL; + } + + state->read_page = pagereadfunc; + /* system_identifier initialized to zeroes above */ + state->private_data = private_data; + /* ReadRecPtr and EndRecPtr initialized to zeroes above */ + /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */ + state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1); + if (!state->errormsg_buf) + { + free(state->readBuf); + free(state); + return NULL; + } + state->errormsg_buf[0] = '\0'; + + /* + * Allocate an initial readRecordBuf of minimal size, which can later be + * enlarged if necessary. + */ + if (!allocate_recordbuf(state, 0)) + { + free(state->errormsg_buf); + free(state->readBuf); + free(state); + return NULL; + } + + return state; +} + +void +XLogReaderFree(XLogReaderState *state) +{ + free(state->errormsg_buf); + if (state->readRecordBuf) + free(state->readRecordBuf); + free(state->readBuf); + free(state); +} + +/* + * Allocate readRecordBuf to fit a record of at least the given length. + * Returns true if successful, false if out of memory. + * + * readRecordBufSize is set to the new buffer size. + * + * To avoid useless small increases, round its size to a multiple of + * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start + * with. (That is enough for all "normal" records, but very large commit or + * abort records might need more space.) + */ +static bool +allocate_recordbuf(XLogReaderState *state, uint32 reclength) +{ + uint32 newSize = reclength; + + newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); + newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ)); + + if (state->readRecordBuf) + free(state->readRecordBuf); + state->readRecordBuf = (char *) malloc(newSize); + if (!state->readRecordBuf) + { + state->readRecordBufSize = 0; + return false; + } + + state->readRecordBufSize = newSize; + return true; +} + +/* + * Attempt to read an XLOG record. + * + * If RecPtr is not NULL, try to read a record at that position. Otherwise + * try to read a record just after the last one previously read. + * + * If the page_read callback fails to read the requested data, NULL is + * returned. The callback is expected to have reported the error; errormsg + * is set to NULL. + * + * If the reading fails for some other reason, NULL is also returned, and + * *errormsg is set to a string with details of the failure. + * + * The returned pointer (or *errormsg) points to an internal buffer that's + * valid until the next call to XLogReadRecord. + */ +XLogRecord * +XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) +{ + XLogRecord *record; + XLogRecPtr targetPagePtr; + bool randAccess = false; + uint32 len, + total_len; + uint32 targetRecOff; + uint32 pageHeaderSize; + bool gotheader; + int readOff; + + randAccess = false; + /* reset error state */ + *errormsg = NULL; + state->errormsg_buf[0] = '\0'; + + if (RecPtr == InvalidXLogRecPtr) + { + RecPtr = state->EndRecPtr; + + if (state->ReadRecPtr == InvalidXLogRecPtr) + randAccess = true; + + /* + * RecPtr is pointing to end+1 of the previous WAL record. If we're + * at a page boundary, no more records can fit on the current page. We + * must skip over the page header, but we can't do that until we've + * read in the page, since the header size is variable. + */ + } + else + { + /* + * In this case, the passed-in record pointer should already be + * pointing to a valid record starting position. + */ + Assert(XRecOffIsValid(RecPtr)); + randAccess = true; /* allow readPageTLI to go backwards too */ + } + + targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); + + /* Read the page containing the record into state->readBuf */ + readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogRecord); + + if (readOff < 0) + { + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + return NULL; + } + + /* + * ReadPageInternal always returns at least the page header, so we can + * examine it now. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); + targetRecOff = RecPtr % XLOG_BLCKSZ; + if (targetRecOff == 0) + { + /* + * At page start, so skip over page header. + */ + RecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + *errormsg = state->errormsg_buf; + return NULL; + } + + if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + *errormsg = state->errormsg_buf; + return NULL; + } + + /* ReadPageInternal has verified the page header */ + Assert(pageHeaderSize <= readOff); + + /* + * Ensure the whole record header or at least the part on this page is + * read. + */ + readOff = ReadPageInternal(state, + targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); + if (readOff < 0) + { + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + return NULL; + } + + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the whole record + * header might not fit on this page. xl_tot_len is the first field of the + * struct, so it must be on this page (the records are MAXALIGNed), but we + * cannot access any other fields until we've verified that we got the + * whole header. + */ + record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); + total_len = record->xl_tot_len; + + /* + * If the whole record header is on this page, validate it immediately. + * Otherwise do just a basic sanity check on xl_tot_len, and validate the + * rest of the header after reading it from the next page. The xl_tot_len + * check is necessary here to ensure that we enter the "Need to reassemble + * record" code path below; otherwise we might fail to apply + * ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, + randAccess)) + { + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + return NULL; + } + gotheader = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, "invalid record length at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + *errormsg = state->errormsg_buf; + return NULL; + } + gotheader = false; + } + + /* + * Enlarge readRecordBuf as needed. + */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) + { + /* We treat this as a "bogus data" condition */ + report_invalid_record(state, "record length %u at %X/%X too long", + total_len, + (uint32) (RecPtr >> 32), (uint32) RecPtr); + *errormsg = state->errormsg_buf; + return NULL; + } + + len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; + if (total_len > len) + { + /* Need to reassemble record */ + char *contdata; + XLogPageHeader pageHeader; + char *buffer; + uint32 gotlen; + + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, + state->readBuf + RecPtr % XLOG_BLCKSZ, len); + buffer = state->readRecordBuf + len; + gotlen = len; + + do + { + /* Calculate pointer to beginning of next page */ + targetPagePtr += XLOG_BLCKSZ; + + /* Wait for the next page to become available */ + readOff = ReadPageInternal(state, targetPagePtr, + Min(total_len - gotlen + SizeOfXLogShortPHD, + XLOG_BLCKSZ)); + + if (readOff < 0) + goto err; + + Assert(SizeOfXLogShortPHD <= readOff); + + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) state->readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + { + report_invalid_record(state, + "there is no contrecord flag at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + goto err; + } + + /* + * Cross-check that xlp_rem_len agrees with how much of the record + * we expect there to be left. + */ + if (pageHeader->xlp_rem_len == 0 || + total_len != (pageHeader->xlp_rem_len + gotlen)) + { + report_invalid_record(state, + "invalid contrecord length %u at %X/%X", + pageHeader->xlp_rem_len, + (uint32) (RecPtr >> 32), (uint32) RecPtr); + goto err; + } + + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); + + if (readOff < pageHeaderSize) + readOff = ReadPageInternal(state, targetPagePtr, + pageHeaderSize); + + Assert(pageHeaderSize <= readOff); + + contdata = (char *) state->readBuf + pageHeaderSize; + len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < len) + len = pageHeader->xlp_rem_len; + + if (readOff < pageHeaderSize + len) + readOff = ReadPageInternal(state, targetPagePtr, + pageHeaderSize + len); + + memcpy(buffer, (char *) contdata, len); + buffer += len; + gotlen += len; + + /* If we just reassembled the record header, validate it. */ + if (!gotheader) + { + record = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, + record, randAccess)) + goto err; + gotheader = true; + } + } while (gotlen < total_len); + + Assert(gotheader); + + record = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, record, RecPtr)) + goto err; + + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->ReadRecPtr = RecPtr; + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); + } + else + { + /* Wait for the record data to become available */ + readOff = ReadPageInternal(state, targetPagePtr, + Min(targetRecOff + total_len, XLOG_BLCKSZ)); + if (readOff < 0) + goto err; + + /* Record does not cross a page boundary */ + if (!ValidXLogRecord(state, record, RecPtr)) + goto err; + + state->EndRecPtr = RecPtr + MAXALIGN(total_len); + + state->ReadRecPtr = RecPtr; + memcpy(state->readRecordBuf, record, total_len); + } + + /* + * Special processing if it's an XLOG SWITCH record + */ + if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) + { + /* Pretend it extends to end of segment */ + state->EndRecPtr += XLogSegSize - 1; + state->EndRecPtr -= state->EndRecPtr % XLogSegSize; + } + + return record; + +err: + + /* + * Invalidate the xlog page we've cached. We might read from a different + * source after failure. + */ + state->readSegNo = 0; + state->readOff = 0; + state->readLen = 0; + + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + + return NULL; +} + +/* + * Read a single xlog page including at least [pagestart, RecPtr] of valid data + * via the read_page() callback. + * + * Returns -1 if the required page cannot be read for some reason; errormsg_buf + * is set in that case (unless the error occurs in the read_page callback). + * + * We fetch the page from a reader-local cache if we know we have the required + * data and if there hasn't been any error since caching the data. + */ +static int +ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +{ + int readLen; + uint32 targetPageOff; + XLogSegNo targetSegNo; + XLogPageHeader hdr; + + Assert((pageptr % XLOG_BLCKSZ) == 0); + + XLByteToSeg(pageptr, targetSegNo); + targetPageOff = (pageptr % XLogSegSize); + + /* check whether we have all the requested data already */ + if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && + reqLen < state->readLen) + return state->readLen; + + /* + * Data is not in our buffer. + * + * Every time we actually read the page, even if we looked at parts of it + * before, we need to do verification as the read_page callback might now + * be rereading data from a different source. + * + * Whenever switching to a new WAL segment, we read the first page of the + * file and validate its header, even if that's not where the target + * record is. This is so that we can check the additional identification + * info that is present in the first page's "long" header. + */ + if (targetSegNo != state->readSegNo && + targetPageOff != 0) + { + XLogPageHeader hdr; + XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + + readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, + state->readBuf, &state->readPageTLI); + if (readLen < 0) + goto err; + + /* we can be sure to have enough WAL available, we scrolled back */ + Assert(readLen == XLOG_BLCKSZ); + + hdr = (XLogPageHeader) state->readBuf; + + if (!ValidXLogPageHeader(state, targetSegmentPtr, hdr)) + goto err; + } + + /* + * First, read the requested data length, but at least a short page header + * so that we can validate it. + */ + readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->readBuf, &state->readPageTLI); + if (readLen < 0) + goto err; + + Assert(readLen <= XLOG_BLCKSZ); + + /* Do we have enough data to check the header length? */ + if (readLen <= SizeOfXLogShortPHD) + goto err; + + Assert(readLen >= reqLen); + + hdr = (XLogPageHeader) state->readBuf; + + /* still not enough */ + if (readLen < XLogPageHeaderSize(hdr)) + { + readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), + state->readBuf, &state->readPageTLI); + if (readLen < 0) + goto err; + } + + /* + * Now that we know we have the full header, validate it. + */ + if (!ValidXLogPageHeader(state, pageptr, hdr)) + goto err; + + /* update cache information */ + state->readSegNo = targetSegNo; + state->readOff = targetPageOff; + state->readLen = readLen; + + return readLen; + +err: + state->readSegNo = 0; + state->readOff = 0; + state->readLen = 0; + return -1; +} + +/* + * Validate an XLOG record header. + * + * This is just a convenience subroutine to avoid duplicated code in + * XLogReadRecord. It's not intended for use from anywhere else. + */ +static bool +ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, + XLogRecPtr PrevRecPtr, XLogRecord *record, + bool randAccess) +{ + /* + * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is + * required. + */ + if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) + { + if (record->xl_len != 0) + { + report_invalid_record(state, + "invalid xlog switch record at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + return false; + } + } + else if (record->xl_len == 0) + { + report_invalid_record(state, + "record with zero length at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + return false; + } + if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || + record->xl_tot_len > SizeOfXLogRecord + record->xl_len + + XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) + { + report_invalid_record(state, + "invalid record length at %X/%X", + (uint32) (RecPtr >> 32), (uint32) RecPtr); + return false; + } + if (record->xl_rmid > RM_MAX_ID) + { + report_invalid_record(state, + "invalid resource manager ID %u at %X/%X", + record->xl_rmid, (uint32) (RecPtr >> 32), + (uint32) RecPtr); + return false; + } + if (randAccess) + { + /* + * We can't exactly verify the prev-link, but surely it should be less + * than the record's own address. + */ + if (!(record->xl_prev < RecPtr)) + { + report_invalid_record(state, + "record with incorrect prev-link %X/%X at %X/%X", + (uint32) (record->xl_prev >> 32), + (uint32) record->xl_prev, + (uint32) (RecPtr >> 32), (uint32) RecPtr); + return false; + } + } + else + { + /* + * Record's prev-link should exactly match our previous location. This + * check guards against torn WAL pages where a stale but valid-looking + * WAL record starts on a sector boundary. + */ + if (record->xl_prev != PrevRecPtr) + { + report_invalid_record(state, + "record with incorrect prev-link %X/%X at %X/%X", + (uint32) (record->xl_prev >> 32), + (uint32) record->xl_prev, + (uint32) (RecPtr >> 32), (uint32) RecPtr); + return false; + } + } + + return true; +} + + +/* + * CRC-check an XLOG record. We do not believe the contents of an XLOG + * record (other than to the minimal extent of computing the amount of + * data to read in) until we've checked the CRCs. + * + * We assume all of the record (that is, xl_tot_len bytes) has been read + * into memory at *record. Also, ValidXLogRecordHeader() has accepted the + * record's header, which means in particular that xl_tot_len is at least + * SizeOfXlogRecord, so it is safe to fetch xl_len. + */ +static bool +ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) +{ + pg_crc32 crc; + int i; + uint32 len = record->xl_len; + BkpBlock bkpb; + char *blk; + size_t remaining = record->xl_tot_len; + + /* First the rmgr data */ + if (remaining < SizeOfXLogRecord + len) + { + /* ValidXLogRecordHeader() should've caught this already... */ + report_invalid_record(state, "invalid record length at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + remaining -= SizeOfXLogRecord + len; + INIT_CRC32(crc); + COMP_CRC32(crc, XLogRecGetData(record), len); + + /* Add in the backup blocks, if any */ + blk = (char *) XLogRecGetData(record) + len; + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + uint32 blen; + + if (!(record->xl_info & XLR_BKP_BLOCK(i))) + continue; + + if (remaining < sizeof(BkpBlock)) + { + report_invalid_record(state, + "invalid backup block size in record at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + memcpy(&bkpb, blk, sizeof(BkpBlock)); + + if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) + { + report_invalid_record(state, + "incorrect hole size in record at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; + + if (remaining < blen) + { + report_invalid_record(state, + "invalid backup block size in record at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + remaining -= blen; + COMP_CRC32(crc, blk, blen); + blk += blen; + } + + /* Check that xl_tot_len agrees with our calculation */ + if (remaining != 0) + { + report_invalid_record(state, + "incorrect total length in record at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + + /* Finally include the record header */ + COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc)); + FIN_CRC32(crc); + + if (!EQ_CRC32(record->xl_crc, crc)) + { + report_invalid_record(state, + "incorrect resource manager data checksum in record at %X/%X", + (uint32) (recptr >> 32), (uint32) recptr); + return false; + } + + return true; +} + +/* + * Validate a page header + */ +static bool +ValidXLogPageHeader(XLogReaderState *state, XLogRecPtr recptr, + XLogPageHeader hdr) +{ + XLogRecPtr recaddr; + XLogSegNo segno; + int32 offset; + + Assert((recptr % XLOG_BLCKSZ) == 0); + + XLByteToSeg(recptr, segno); + offset = recptr % XLogSegSize; + + XLogSegNoOffsetToRecPtr(segno, offset, recaddr); + + if (hdr->xlp_magic != XLOG_PAGE_MAGIC) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->readPageTLI, segno); + + report_invalid_record(state, + "invalid magic number %04X in log segment %s, offset %u", + hdr->xlp_magic, + fname, + offset); + return false; + } + + if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->readPageTLI, segno); + + report_invalid_record(state, + "invalid info bits %04X in log segment %s, offset %u", + hdr->xlp_info, + fname, + offset); + return false; + } + + if (hdr->xlp_info & XLP_LONG_HEADER) + { + XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; + + if (state->system_identifier && + longhdr->xlp_sysid != state->system_identifier) + { + char fhdrident_str[32]; + char sysident_str[32]; + + /* + * Format sysids separately to keep platform-dependent format code + * out of the translatable message string. + */ + snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, + longhdr->xlp_sysid); + snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, + state->system_identifier); + report_invalid_record(state, + "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s.", + fhdrident_str, sysident_str); + return false; + } + else if (longhdr->xlp_seg_size != XLogSegSize) + { + report_invalid_record(state, + "WAL file is from different database system: Incorrect XLOG_SEG_SIZE in page header."); + return false; + } + else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) + { + report_invalid_record(state, + "WAL file is from different database system: Incorrect XLOG_BLCKSZ in page header."); + return false; + } + } + else if (offset == 0) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->readPageTLI, segno); + + /* hmm, first page of file doesn't have a long header? */ + report_invalid_record(state, + "invalid info bits %04X in log segment %s, offset %u", + hdr->xlp_info, + fname, + offset); + return false; + } + + if (hdr->xlp_pageaddr != recaddr) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->readPageTLI, segno); + + report_invalid_record(state, + "unexpected pageaddr %X/%X in log segment %s, offset %u", + (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr, + fname, + offset); + return false; + } + + /* + * Since child timelines are always assigned a TLI greater than their + * immediate parent's TLI, we should never see TLI go backwards across + * successive pages of a consistent WAL sequence. + * + * Sometimes we re-read a segment that's already been (partially) read. So + * we only verify TLIs for pages that are later than the last remembered + * LSN. + */ + if (recptr > state->latestPagePtr) + { + if (hdr->xlp_tli < state->latestPageTLI) + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, state->readPageTLI, segno); + + report_invalid_record(state, + "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", + hdr->xlp_tli, + state->latestPageTLI, + fname, + offset); + return false; + } + } + state->latestPagePtr = recptr; + state->latestPageTLI = hdr->xlp_tli; + + return true; +} + +#ifdef FRONTEND +/* + * Functions that are currently not needed in the backend, but are better + * implemented inside xlogreader.c because of the internal facilities available + * here. + */ + +/* + * Find the first record with at an lsn >= RecPtr. + * + * Useful for checking wether RecPtr is a valid xlog address for reading and to + * find the first valid address after some address when dumping records for + * debugging purposes. + */ +XLogRecPtr +XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +{ + XLogReaderState saved_state = *state; + XLogRecPtr targetPagePtr; + XLogRecPtr tmpRecPtr; + int targetRecOff; + XLogRecPtr found = InvalidXLogRecPtr; + uint32 pageHeaderSize; + XLogPageHeader header; + XLogRecord *record; + int readLen; + char *errormsg; + + Assert(!XLogRecPtrIsInvalid(RecPtr)); + + targetRecOff = RecPtr % XLOG_BLCKSZ; + + /* scroll back to page boundary */ + targetPagePtr = RecPtr - targetRecOff; + + /* Read the page containing the record */ + readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); + if (readLen < 0) + goto err; + + header = (XLogPageHeader) state->readBuf; + + pageHeaderSize = XLogPageHeaderSize(header); + + /* make sure we have enough data for the page header */ + readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); + if (readLen < 0) + goto err; + + /* skip over potential continuation data */ + if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) + { + /* record headers are MAXALIGN'ed */ + tmpRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(header->xlp_rem_len); + } + else + { + tmpRecPtr = targetPagePtr + pageHeaderSize; + } + + /* + * we know now that tmpRecPtr is an address pointing to a valid XLogRecord + * because either we're at the first record after the beginning of a page + * or we just jumped over the remaining data of a continuation. + */ + while ((record = XLogReadRecord(state, tmpRecPtr, &errormsg))) + { + /* continue after the record */ + tmpRecPtr = InvalidXLogRecPtr; + + /* past the record we've found, break out */ + if (RecPtr <= state->ReadRecPtr) + { + found = state->ReadRecPtr; + goto out; + } + } + +err: +out: + /* Reset state to what we had before finding the record */ + state->readSegNo = 0; + state->readOff = 0; + state->readLen = 0; + state->ReadRecPtr = saved_state.ReadRecPtr; + state->EndRecPtr = saved_state.EndRecPtr; + + return found; +} + +#endif /* FRONTEND */ diff --git a/src/backend/nls.mk b/src/backend/nls.mk index 30f6a2bf9f..c072de7fa7 100644 --- a/src/backend/nls.mk +++ b/src/backend/nls.mk @@ -4,12 +4,13 @@ AVAIL_LANGUAGES = de es fr ja pt_BR tr zh_CN zh_TW GETTEXT_FILES = + gettext-files GETTEXT_TRIGGERS = $(BACKEND_COMMON_GETTEXT_TRIGGERS) \ GUC_check_errmsg GUC_check_errdetail GUC_check_errhint \ - write_stderr yyerror parser_yyerror + write_stderr yyerror parser_yyerror report_invalid_record GETTEXT_FLAGS = $(BACKEND_COMMON_GETTEXT_FLAGS) \ GUC_check_errmsg:1:c-format \ GUC_check_errdetail:1:c-format \ GUC_check_errhint:1:c-format \ - write_stderr:1:c-format + write_stderr:1:c-format \ + report_invalid_record:2:c-format gettext-files: distprep find $(srcdir)/ $(srcdir)/../port/ -name '*.c' -print | LC_ALL=C sort >$@ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h new file mode 100644 index 0000000000..36907d6330 --- /dev/null +++ b/src/include/access/xlogreader.h @@ -0,0 +1,116 @@ +/*------------------------------------------------------------------------- + * + * xlogreader.h + * Definitions for the generic XLog reading facility + * + * Portions Copyright (c) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/access/xlogreader.h + * + * NOTES + * See the definition of the XLogReaderState struct for instructions on + * how to use the XLogReader infrastructure. + * + * The basic idea is to allocate an XLogReaderState via + * XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL. + *------------------------------------------------------------------------- + */ +#ifndef XLOGREADER_H +#define XLOGREADER_H + +#include "access/xlog_internal.h" + +typedef struct XLogReaderState XLogReaderState; + +/* Function type definition for the read_page callback */ +typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, + XLogRecPtr targetPagePtr, + int reqLen, + char *readBuf, + TimeLineID *pageTLI); + +struct XLogReaderState +{ + /* ---------------------------------------- + * Public parameters + * ---------------------------------------- + */ + + /* + * Data input callback (mandatory). + * + * This callback shall read at least reqLen valid bytes of the xlog page + * starting at targetPagePtr, and store them in readBuf. The callback + * shall return the number of bytes read (never more than XLOG_BLCKSZ), or + * -1 on failure. The callback shall sleep, if necessary, to wait for the + * requested bytes to become available. The callback will not be invoked + * again for the same page unless more than the returned number of bytes + * are necessary. + * + * *pageTLI should be set to the TLI of the file the page was read from. + * It is currently used only for error reporting purposes, to reconstruct + * the name of the WAL file where an error occurred. + */ + XLogPageReadCB read_page; + + /* + * System identifier of the xlog files we're about to read. Set to zero + * (the default value) if unknown or unimportant. + */ + uint64 system_identifier; + + /* + * Opaque data for callbacks to use. Not used by XLogReader. + */ + void *private_data; + + /* + * Start and end point of last record read. EndRecPtr is also used as the + * position to read next, if XLogReadRecord receives an invalid recptr. + */ + XLogRecPtr ReadRecPtr; /* start of last record read */ + XLogRecPtr EndRecPtr; /* end+1 of last record read */ + + /* ---------------------------------------- + * private/internal state + * ---------------------------------------- + */ + + /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ + char *readBuf; + + /* last read segment, segment offset, read length, TLI */ + XLogSegNo readSegNo; + uint32 readOff; + uint32 readLen; + TimeLineID readPageTLI; + + /* beginning of last page read, and its TLI */ + XLogRecPtr latestPagePtr; + TimeLineID latestPageTLI; + + /* Buffer for current ReadRecord result (expandable) */ + char *readRecordBuf; + uint32 readRecordBufSize; + + /* Buffer to hold error message */ + char *errormsg_buf; +}; + +/* Get a new XLogReader */ +extern XLogReaderState *XLogReaderAllocate(XLogPageReadCB pagereadfunc, + void *private_data); + +/* Free an XLogReader */ +extern void XLogReaderFree(XLogReaderState *state); + +/* Read the next XLog record. Returns NULL on end-of-WAL or failure */ +extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, + XLogRecPtr recptr, char **errormsg); + +#ifdef FRONTEND +extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); +#endif /* FRONTEND */ + +#endif /* XLOGREADER_H */