diff --git a/src/backend/access/transam/generic_xlog.c b/src/backend/access/transam/generic_xlog.c index 4b0c63817f..bbb542b322 100644 --- a/src/backend/access/transam/generic_xlog.c +++ b/src/backend/access/transam/generic_xlog.c @@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record) uint8 block_id; /* Protect limited size of buffers[] array */ - Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES); + Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES); /* Iterate over blocks */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { XLogRedoAction action; @@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record) } /* Changes are done: unlock and release all buffers */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (BufferIsValid(buffers[block_id])) UnlockReleaseBuffer(buffers[block_id]); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f436471b27..4ac3871c74 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -971,6 +971,8 @@ XLogInsertRecord(XLogRecData *rdata, if (XLOG_DEBUG) { static XLogReaderState *debug_reader = NULL; + XLogRecord *record; + DecodedXLogRecord *decoded; StringInfoData buf; StringInfoData recordBuf; char *errormsg = NULL; @@ -990,6 +992,11 @@ XLogInsertRecord(XLogRecData *rdata, for (; rdata != NULL; rdata = rdata->next) appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); + /* We also need temporary space to decode the record. */ + record = (XLogRecord *) recordBuf.data; + decoded = (DecodedXLogRecord *) + palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len)); + if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(), NULL); @@ -998,7 +1005,10 @@ XLogInsertRecord(XLogRecData *rdata, { appendStringInfoString(&buf, "error decoding record: out of memory while allocating a WAL reading processor"); } - else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, + else if (!DecodeXLogRecord(debug_reader, + decoded, + record, + EndPos, &errormsg)) { appendStringInfo(&buf, "error decoding record: %s", @@ -1007,10 +1017,14 @@ XLogInsertRecord(XLogRecData *rdata, else { appendStringInfoString(&buf, " - "); + + debug_reader->record = decoded; xlog_outdesc(&buf, debug_reader); + debug_reader->record = NULL; } elog(LOG, "%s", buf.data); + pfree(decoded); pfree(buf.data); pfree(recordBuf.data); MemoryContextSwitchTo(oldCxt); @@ -7738,7 +7752,7 @@ xlog_redo(XLogReaderState *record) * resource manager needs to generate conflicts, it has to define a * separate WAL record type and redo routine. */ - for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++) + for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { Buffer buffer; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index b7c06da255..e437c42992 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -45,6 +45,7 @@ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen); static void XLogReaderInvalReadState(XLogReaderState *state); +static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, @@ -56,6 +57,12 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 +/* + * Default size; large enough that typical users of XLogReader won't often need + * to use the 'oversized' memory allocation code path. + */ +#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024) + /* * Construct a string in state->errormsg_buf explaining what's wrong with * the current record being read. @@ -70,6 +77,24 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) va_start(args, fmt); vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); va_end(args); + + state->errormsg_deferred = true; +} + +/* + * Set the size of the decoding buffer. A pointer to a caller supplied memory + * region may also be passed in, in which case non-oversized records will be + * decoded there. + */ +void +XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) +{ + Assert(state->decode_buffer == NULL); + + state->decode_buffer = buffer; + state->decode_buffer_size = size; + state->decode_buffer_tail = buffer; + state->decode_buffer_head = buffer; } /* @@ -92,8 +117,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* initialize caller-provided support functions */ state->routine = *routine; - state->max_block_id = -1; - /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the @@ -144,18 +167,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, void XLogReaderFree(XLogReaderState *state) { - int block_id; - if (state->seg.ws_file != -1) state->routine.segment_close(state); - for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) - { - if (state->blocks[block_id].data) - pfree(state->blocks[block_id].data); - } - if (state->main_data) - pfree(state->main_data); + if (state->decode_buffer && state->free_decode_buffer) + pfree(state->decode_buffer); pfree(state->errormsg_buf); if (state->readRecordBuf) @@ -251,7 +267,133 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; + state->NextRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->DecodeRecPtr = InvalidXLogRecPtr; +} + +/* + * See if we can release the last record that was returned by + * XLogNextRecord(), if any, to free up space. + */ +void +XLogReleasePreviousRecord(XLogReaderState *state) +{ + DecodedXLogRecord *record; + + if (!state->record) + return; + + /* + * Remove it from the decoded record queue. It must be the oldest item + * decoded, decode_queue_head. + */ + record = state->record; + Assert(record == state->decode_queue_head); + state->record = NULL; + state->decode_queue_head = record->next; + + /* It might also be the newest item decoded, decode_queue_tail. */ + if (state->decode_queue_tail == record) + state->decode_queue_tail = NULL; + + /* Release the space. */ + if (unlikely(record->oversized)) + { + /* It's not in the the decode buffer, so free it to release space. */ + pfree(record); + } + else + { + /* It must be the head (oldest) record in the decode buffer. */ + Assert(state->decode_buffer_head == (char *) record); + + /* + * We need to update head to point to the next record that is in the + * decode buffer, if any, being careful to skip oversized ones + * (they're not in the decode buffer). + */ + record = record->next; + while (unlikely(record && record->oversized)) + record = record->next; + + if (record) + { + /* Adjust head to release space up to the next record. */ + state->decode_buffer_head = (char *) record; + } + else + { + /* + * Otherwise we might as well just reset head and tail to the + * start of the buffer space, because we're empty. This means + * we'll keep overwriting the same piece of memory if we're not + * doing any prefetching. + */ + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + } + } +} + +/* + * Attempt to read an XLOG record. + * + * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be + * called before the first call to XLogNextRecord(). This functions returns + * records and errors that were put into an internal queue by XLogReadAhead(). + * + * On success, a record is returned. + * + * The returned record (or *errormsg) points to an internal buffer that's + * valid until the next call to XLogNextRecord. + */ +DecodedXLogRecord * +XLogNextRecord(XLogReaderState *state, char **errormsg) +{ + /* Release the last record returned by XLogNextRecord(). */ + XLogReleasePreviousRecord(state); + + if (state->decode_queue_head == NULL) + { + *errormsg = NULL; + if (state->errormsg_deferred) + { + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + state->errormsg_deferred = false; + } + + /* + * state->EndRecPtr is expected to have been set by the last call to + * XLogBeginRead() or XLogNextRecord(), and is the location of the + * error. + */ + Assert(!XLogRecPtrIsInvalid(state->EndRecPtr)); + + return NULL; + } + + /* + * Record this as the most recent record returned, so that we'll release + * it next time. This also exposes it to the traditional + * XLogRecXXX(xlogreader) macros, which work with the decoder rather than + * the record for historical reasons. + */ + state->record = state->decode_queue_head; + + /* + * Update the pointers to the beginning and one-past-the-end of this + * record, again for the benefit of historical code that expected the + * decoder to track this rather than accessing these fields of the record + * itself. + */ + state->ReadRecPtr = state->record->lsn; + state->EndRecPtr = state->record->next_lsn; + + *errormsg = NULL; + + return state->record; } /* @@ -272,6 +414,119 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) */ XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg) +{ + DecodedXLogRecord *decoded; + + /* + * Release last returned record, if there is one. We need to do this so + * that we can check for empty decode queue accurately. + */ + XLogReleasePreviousRecord(state); + + /* + * Call XLogReadAhead() in blocking mode to make sure there is something + * in the queue, though we don't use the result. + */ + if (!XLogReaderHasQueuedRecordOrError(state)) + XLogReadAhead(state, false /* nonblocking */ ); + + /* Consume the head record or error. */ + decoded = XLogNextRecord(state, errormsg); + if (decoded) + { + /* + * This function returns a pointer to the record's header, not the + * actual decoded record. The caller will access the decoded record + * through the XLogRecGetXXX() macros, which reach the decoded + * recorded as xlogreader->record. + */ + Assert(state->record == decoded); + return &decoded->header; + } + + return NULL; +} + +/* + * Allocate space for a decoded record. The only member of the returned + * object that is initialized is the 'oversized' flag, indicating that the + * decoded record wouldn't fit in the decode buffer and must eventually be + * freed explicitly. + * + * The caller is responsible for adjusting decode_buffer_tail with the real + * size after successfully decoding a record into this space. This way, if + * decoding fails, then there is nothing to undo unless the 'oversized' flag + * was set and pfree() must be called. + * + * Return NULL if there is no space in the decode buffer and allow_oversized + * is false, or if memory allocation fails for an oversized buffer. + */ +static DecodedXLogRecord * +XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized) +{ + size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len); + DecodedXLogRecord *decoded = NULL; + + /* Allocate a circular decode buffer if we don't have one already. */ + if (unlikely(state->decode_buffer == NULL)) + { + if (state->decode_buffer_size == 0) + state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE; + state->decode_buffer = palloc(state->decode_buffer_size); + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + state->free_decode_buffer = true; + } + + /* Try to allocate space in the circular decode buffer. */ + if (state->decode_buffer_tail >= state->decode_buffer_head) + { + /* Empty, or tail is to the right of head. */ + if (state->decode_buffer_tail + required_space <= + state->decode_buffer + state->decode_buffer_size) + { + /* There is space between tail and end. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; + decoded->oversized = false; + return decoded; + } + else if (state->decode_buffer + required_space < + state->decode_buffer_head) + { + /* There is space between start and head. */ + decoded = (DecodedXLogRecord *) state->decode_buffer; + decoded->oversized = false; + return decoded; + } + } + else + { + /* Tail is to the left of head. */ + if (state->decode_buffer_tail + required_space < + state->decode_buffer_head) + { + /* There is space between tail and head. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; + decoded->oversized = false; + return decoded; + } + } + + /* Not enough space in the decode buffer. Are we allowed to allocate? */ + if (allow_oversized) + { + decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM); + if (decoded == NULL) + return NULL; + decoded->oversized = true; + return decoded; + } + + return NULL; +} + +static XLogPageReadResult +XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) { XLogRecPtr RecPtr; XLogRecord *record; @@ -284,6 +539,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) bool assembled; bool gotheader; int readOff; + DecodedXLogRecord *decoded; + char *errormsg; /* not used */ /* * randAccess indicates whether to verify the previous-record pointer of @@ -293,21 +550,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) randAccess = false; /* reset error state */ - *errormsg = NULL; state->errormsg_buf[0] = '\0'; + decoded = NULL; - ResetDecoder(state); state->abortedRecPtr = InvalidXLogRecPtr; state->missingContrecPtr = InvalidXLogRecPtr; - RecPtr = state->EndRecPtr; + RecPtr = state->NextRecPtr; - if (state->ReadRecPtr != InvalidXLogRecPtr) + if (state->DecodeRecPtr != InvalidXLogRecPtr) { /* read the record after the one we just read */ /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If + * NextRecPtr is pointing to end+1 of the previous WAL record. If * we're at a page boundary, no more records can fit on the current * page. We must skip over the page header, but we can't do that until * we've read in the page, since the header size is variable. @@ -318,7 +574,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) /* * Caller supplied a position to start at. * - * In this case, EndRecPtr should already be pointing to a valid + * In this case, NextRecPtr should already be pointing to a valid * record starting position. */ Assert(XRecOffIsValid(RecPtr)); @@ -326,6 +582,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) } restart: + state->nonblocking = nonblocking; state->currRecPtr = RecPtr; assembled = false; @@ -339,7 +596,9 @@ restart: */ readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; /* @@ -395,7 +654,7 @@ restart: */ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) goto err; gotheader = true; @@ -414,6 +673,31 @@ restart: gotheader = false; } + /* + * Find space to decode this record. Don't allow oversized allocation if + * the caller requested nonblocking. Otherwise, we *have* to try to + * decode the record now because the caller has nothing else to do, so + * allow an oversized record to be palloc'd if that turns out to be + * necessary. + */ + decoded = XLogReadRecordAlloc(state, + total_len, + !nonblocking /* allow_oversized */ ); + if (decoded == NULL) + { + /* + * There is no space in the decode buffer. The caller should help + * with that problem by consuming some records. + */ + if (nonblocking) + return XLREAD_WOULDBLOCK; + + /* We failed to allocate memory for an oversized record. */ + report_invalid_record(state, + "out of memory while trying to decode a record of length %u", total_len); + goto err; + } + len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; if (total_len > len) { @@ -453,7 +737,9 @@ restart: Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; Assert(SizeOfXLogShortPHD <= readOff); @@ -471,7 +757,6 @@ restart: if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) { state->overwrittenRecPtr = RecPtr; - ResetDecoder(state); RecPtr = targetPagePtr; goto restart; } @@ -526,7 +811,7 @@ restart: if (!gotheader) { record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, randAccess)) goto err; gotheader = true; @@ -540,8 +825,8 @@ restart: goto err; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize + state->DecodeRecPtr = RecPtr; + state->NextRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len); } else @@ -549,16 +834,18 @@ restart: /* Wait for the record data to become available */ readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) goto err; /* Record does not cross a page boundary */ if (!ValidXLogRecord(state, record, RecPtr)) goto err; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + state->NextRecPtr = RecPtr + MAXALIGN(total_len); - state->ReadRecPtr = RecPtr; + state->DecodeRecPtr = RecPtr; } /* @@ -568,14 +855,40 @@ restart: (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->segcxt.ws_segsize - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); + state->NextRecPtr += state->segcxt.ws_segsize - 1; + state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; + if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg)) + { + /* Record the location of the next record. */ + decoded->next_lsn = state->NextRecPtr; + + /* + * If it's in the decode buffer, mark the decode buffer space as + * occupied. + */ + if (!decoded->oversized) + { + /* The new decode buffer head must be MAXALIGNed. */ + Assert(decoded->size == MAXALIGN(decoded->size)); + if ((char *) decoded == state->decode_buffer) + state->decode_buffer_tail = state->decode_buffer + decoded->size; + else + state->decode_buffer_tail += decoded->size; + } + + /* Insert it into the queue of decoded records. */ + Assert(state->decode_queue_tail != decoded); + if (state->decode_queue_tail) + state->decode_queue_tail->next = decoded; + state->decode_queue_tail = decoded; + if (!state->decode_queue_head) + state->decode_queue_head = decoded; + return XLREAD_SUCCESS; + } else - return NULL; + return XLREAD_FAIL; err: if (assembled) @@ -593,14 +906,46 @@ err: state->missingContrecPtr = targetPagePtr; } + if (decoded && decoded->oversized) + pfree(decoded); + /* * Invalidate the read state. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; + /* + * If an error was written to errmsg_buf, it'll be returned to the caller + * of XLogReadRecord() after all successfully decoded records from the + * read queue. + */ + + return XLREAD_FAIL; +} + +/* + * Try to decode the next available record, and return it. The record will + * also be returned to XLogNextRecord(), which must be called to 'consume' + * each record. + * + * If nonblocking is true, may return NULL due to lack of data or WAL decoding + * space. + */ +DecodedXLogRecord * +XLogReadAhead(XLogReaderState *state, bool nonblocking) +{ + XLogPageReadResult result; + + if (state->errormsg_deferred) + return NULL; + + result = XLogDecodeNextRecord(state, nonblocking); + if (result == XLREAD_SUCCESS) + { + Assert(state->decode_queue_tail != NULL); + return state->decode_queue_tail; + } return NULL; } @@ -609,8 +954,14 @@ err: * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the page_read() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns XLREAD_FAIL if the required page cannot be read for some + * reason; errormsg_buf is set in that case (unless the error occurs in the + * page_read callback). + * + * Returns XLREAD_WOULDBLOCK if the requested data can't be read without + * waiting. This can be returned only if the installed page_read callback + * respects the state->nonblocking flag, and cannot read the requested data + * immediately. * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -652,7 +1003,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; /* we can be sure to have enough WAL available, we scrolled back */ @@ -670,7 +1023,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; Assert(readLen <= XLOG_BLCKSZ); @@ -689,7 +1044,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, state->readBuf); - if (readLen < 0) + if (readLen == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readLen < 0) goto err; } @@ -707,8 +1064,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: - XLogReaderInvalReadState(state); - return -1; + if (state->errormsg_buf[0] != '\0') + { + state->errormsg_deferred = true; + XLogReaderInvalReadState(state); + } + return XLREAD_FAIL; } /* @@ -987,6 +1348,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) Assert(!XLogRecPtrIsInvalid(RecPtr)); + /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */ + state->nonblocking = false; + /* * skip over potential continuation data, keeping in mind that it may span * multiple pages @@ -1187,34 +1551,83 @@ WALRead(XLogReaderState *state, * ---------------------------------------- */ -/* private function to reset the state between records */ +/* + * Private function to reset the state, forgetting all decoded records, if we + * are asked to move to a new read position. + */ static void ResetDecoder(XLogReaderState *state) { - int block_id; + DecodedXLogRecord *r; - state->decoded_record = NULL; - - state->main_data_len = 0; - - for (block_id = 0; block_id <= state->max_block_id; block_id++) + /* Reset the decoded record queue, freeing any oversized records. */ + while ((r = state->decode_queue_head) != NULL) { - state->blocks[block_id].in_use = false; - state->blocks[block_id].has_image = false; - state->blocks[block_id].has_data = false; - state->blocks[block_id].apply_image = false; + state->decode_queue_head = r->next; + if (r->oversized) + pfree(r); } - state->max_block_id = -1; + state->decode_queue_tail = NULL; + state->decode_queue_head = NULL; + state->record = NULL; + + /* Reset the decode buffer to empty. */ + state->decode_buffer_tail = state->decode_buffer; + state->decode_buffer_head = state->decode_buffer; + + /* Clear error state. */ + state->errormsg_buf[0] = '\0'; + state->errormsg_deferred = false; } /* - * Decode the previously read record. + * Compute the maximum possible amount of padding that could be required to + * decode a record, given xl_tot_len from the record's header. This is the + * amount of output buffer space that we need to decode a record, though we + * might not finish up using it all. + * + * This computation is pessimistic and assumes the maximum possible number of + * blocks, due to lack of better information. + */ +size_t +DecodeXLogRecordRequiredSpace(size_t xl_tot_len) +{ + size_t size = 0; + + /* Account for the fixed size part of the decoded record struct. */ + size += offsetof(DecodedXLogRecord, blocks[0]); + /* Account for the flexible blocks array of maximum possible size. */ + size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1); + /* Account for all the raw main and block data. */ + size += xl_tot_len; + /* We might insert padding before main_data. */ + size += (MAXIMUM_ALIGNOF - 1); + /* We might insert padding before each block's data. */ + size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1); + /* We might insert padding at the end. */ + size += (MAXIMUM_ALIGNOF - 1); + + return size; +} + +/* + * Decode a record. "decoded" must point to a MAXALIGNed memory area that has + * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On + * success, decoded->size contains the actual space occupied by the decoded + * record, which may turn out to be less. + * + * Only decoded->oversized member must be initialized already, and will not be + * modified. Other members will be initialized as required. * * On error, a human-readable error message is returned in *errormsg, and * the return value is false. */ bool -DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, + char **errormsg) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1229,17 +1642,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) } while(0) char *ptr; + char *out; uint32 remaining; uint32 datatotal; RelFileNode *rnode = NULL; uint8 block_id; - ResetDecoder(state); - - state->decoded_record = record; - state->record_origin = InvalidRepOriginId; - state->toplevel_xid = InvalidTransactionId; - + decoded->header = *record; + decoded->lsn = lsn; + decoded->next = NULL; + decoded->record_origin = InvalidRepOriginId; + decoded->toplevel_xid = InvalidTransactionId; + decoded->main_data = NULL; + decoded->main_data_len = 0; + decoded->max_block_id = -1; ptr = (char *) record; ptr += SizeOfXLogRecord; remaining = record->xl_tot_len - SizeOfXLogRecord; @@ -1257,7 +1673,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ @@ -1268,18 +1684,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) uint32 main_data_len; COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ } else if (block_id == XLR_BLOCK_ID_ORIGIN) { - COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); + COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); } else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) { - COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId)); } else if (block_id <= XLR_MAX_BLOCK_ID) { @@ -1287,7 +1703,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) DecodedBkpBlock *blk; uint8 fork_flags; - if (block_id <= state->max_block_id) + /* mark any intervening block IDs as not in use */ + for (int i = decoded->max_block_id + 1; i < block_id; ++i) + decoded->blocks[i].in_use = false; + + if (block_id <= decoded->max_block_id) { report_invalid_record(state, "out-of-order block_id %u at %X/%X", @@ -1295,9 +1715,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; } - state->max_block_id = block_id; + decoded->max_block_id = block_id; - blk = &state->blocks[block_id]; + blk = &decoded->blocks[block_id]; blk->in_use = true; blk->apply_image = false; @@ -1440,17 +1860,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) /* * Ok, we've parsed the fragment headers, and verified that the total * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. + * left. Copy the data of each fragment to contiguous space after the + * blocks array, inserting alignment padding before the data fragments so + * they can be cast to struct pointers by REDO routines. */ + out = ((char *) decoded) + + offsetof(DecodedXLogRecord, blocks) + + sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1); /* block data first */ - for (block_id = 0; block_id <= state->max_block_id; block_id++) + for (block_id = 0; block_id <= decoded->max_block_id; block_id++) { - DecodedBkpBlock *blk = &state->blocks[block_id]; + DecodedBkpBlock *blk = &decoded->blocks[block_id]; if (!blk->in_use) continue; @@ -1459,58 +1880,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) if (blk->has_image) { - blk->bkp_image = ptr; + /* no need to align image */ + blk->bkp_image = out; + memcpy(out, ptr, blk->bimg_len); ptr += blk->bimg_len; + out += blk->bimg_len; } if (blk->has_data) { - if (!blk->data || blk->data_len > blk->data_bufsz) - { - if (blk->data) - pfree(blk->data); - - /* - * Force the initial request to be BLCKSZ so that we don't - * waste time with lots of trips through this stanza as a - * result of WAL compression. - */ - blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ)); - blk->data = palloc(blk->data_bufsz); - } + out = (char *) MAXALIGN(out); + blk->data = out; memcpy(blk->data, ptr, blk->data_len); ptr += blk->data_len; + out += blk->data_len; } } /* and finally, the main data */ - if (state->main_data_len > 0) + if (decoded->main_data_len > 0) { - if (!state->main_data || state->main_data_len > state->main_data_bufsz) - { - if (state->main_data) - pfree(state->main_data); - - /* - * main_data_bufsz must be MAXALIGN'ed. In many xlog record - * types, we omit trailing struct padding on-disk to save a few - * bytes; but compilers may generate accesses to the xlog struct - * that assume that padding bytes are present. If the palloc - * request is not large enough to include such padding bytes then - * we'll get valgrind complaints due to otherwise-harmless fetches - * of the padding bytes. - * - * In addition, force the initial request to be reasonably large - * so that we don't waste time with lots of trips through this - * stanza. BLCKSZ / 2 seems like a good compromise choice. - */ - state->main_data_bufsz = MAXALIGN(Max(state->main_data_len, - BLCKSZ / 2)); - state->main_data = palloc(state->main_data_bufsz); - } - memcpy(state->main_data, ptr, state->main_data_len); - ptr += state->main_data_len; + out = (char *) MAXALIGN(out); + decoded->main_data = out; + memcpy(decoded->main_data, ptr, decoded->main_data_len); + ptr += decoded->main_data_len; + out += decoded->main_data_len; } + /* Report the actual size we used. */ + decoded->size = MAXALIGN(out - (char *) decoded); + Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + decoded->size); + return true; shortdata_err: @@ -1536,10 +1936,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (rnode) *rnode = bkpb->rnode; if (forknum) @@ -1559,10 +1960,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return NULL; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (!bkpb->has_data) { @@ -1590,12 +1992,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) char *ptr; PGAlignedBlock tmp; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - if (!record->blocks[block_id].has_image) + if (!record->record->blocks[block_id].has_image) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; ptr = bkpb->bkp_image; if (BKPIMAGE_COMPRESSED(bkpb->bimg_info)) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index f9f212680b..9feea3e6ec 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -2139,7 +2139,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record) int block_id; /* decode block references */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { RelFileNode rnode; ForkNumber forknum; @@ -2271,7 +2271,7 @@ verifyBackupPageConsistency(XLogReaderState *record) Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0); - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { Buffer buf; Page page; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 54d5f20734..511f2f186f 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -370,7 +370,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, * going to initialize it. And vice versa. */ zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); - willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0; + willinit = (XLogRecGetBlock(record, block_id)->flags & BKPBLOCK_WILL_INIT) != 0; if (willinit && !zeromode) elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine"); if (!willinit && zeromode) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8c00a73cb9..77bc7aea7a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -111,7 +111,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { ReorderBufferAssignChild(ctx->reorder, txid, - record->decoded_record->xl_xid, + XLogRecGetXid(record), buf.origptr); } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 56df08c64f..7cfa169e9b 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -432,7 +432,7 @@ extractPageInfo(XLogReaderState *record) RmgrNames[rmid], info); } - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { RelFileNode rnode; ForkNumber forknum; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index f128050b4e..fc081adfb8 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -403,14 +403,13 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len) * Calculate the amount of FPI data in the record. * * XXX: We peek into xlogreader's private decoded backup blocks for the - * bimg_len indicating the length of FPI data. It doesn't seem worth it to - * add an accessor macro for this. + * bimg_len indicating the length of FPI data. */ *fpi_len = 0; - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (XLogRecHasBlockImage(record, block_id)) - *fpi_len += record->blocks[block_id].bimg_len; + *fpi_len += XLogRecGetBlock(record, block_id)->bimg_len; } /* @@ -508,7 +507,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) if (!config->bkp_details) { /* print block references (short format) */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (!XLogRecHasBlockRef(record, block_id)) continue; @@ -539,7 +538,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) { /* print block references (detailed format) */ putchar('\n'); - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (!XLogRecHasBlockRef(record, block_id)) continue; @@ -552,7 +551,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) blk); if (XLogRecHasBlockImage(record, block_id)) { - uint8 bimg_info = record->blocks[block_id].bimg_info; + uint8 bimg_info = XLogRecGetBlock(record, block_id)->bimg_info; if (BKPIMAGE_COMPRESSED(bimg_info)) { @@ -571,11 +570,11 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) "compression saved: %u, method: %s", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->blocks[block_id].hole_offset, - record->blocks[block_id].hole_length, + XLogRecGetBlock(record, block_id)->hole_offset, + XLogRecGetBlock(record, block_id)->hole_length, BLCKSZ - - record->blocks[block_id].hole_length - - record->blocks[block_id].bimg_len, + XLogRecGetBlock(record, block_id)->hole_length - + XLogRecGetBlock(record, block_id)->bimg_len, method); } else @@ -583,8 +582,8 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) printf(" (FPW%s); hole: offset: %u, length: %u", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->blocks[block_id].hole_offset, - record->blocks[block_id].hole_length); + XLogRecGetBlock(record, block_id)->hole_offset, + XLogRecGetBlock(record, block_id)->hole_length); } } putchar('\n'); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 477f0efe26..f4388cc9be 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -144,6 +144,30 @@ typedef struct uint16 data_bufsz; } DecodedBkpBlock; +/* + * The decoded contents of a record. This occupies a contiguous region of + * memory, with main_data and blocks[n].data pointing to memory after the + * members declared here. + */ +typedef struct DecodedXLogRecord +{ + /* Private member used for resource management. */ + size_t size; /* total size of decoded record */ + bool oversized; /* outside the regular decode buffer? */ + struct DecodedXLogRecord *next; /* decoded record queue link */ + + /* Public members. */ + XLogRecPtr lsn; /* location */ + XLogRecPtr next_lsn; /* location of next record */ + XLogRecord header; /* header */ + RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + char *main_data; /* record's main data portion */ + uint32 main_data_len; /* main data portion's length */ + int max_block_id; /* highest block_id in use (-1 if none) */ + DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]; +} DecodedXLogRecord; + struct XLogReaderState { /* @@ -171,6 +195,9 @@ struct XLogReaderState * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. + * + * Start and end point of last record returned by XLogReadRecord(). These + * are also available as record->lsn and record->next_lsn. */ XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ @@ -192,27 +219,43 @@ struct XLogReaderState * Use XLogRecGet* functions to investigate the record; these fields * should not be accessed directly. * ---------------------------------------- + * Start and end point of the last record read and decoded by + * XLogReadRecordInternal(). NextRecPtr is also used as the position to + * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to + * the requested starting position. */ - XLogRecord *decoded_record; /* currently decoded record */ + XLogRecPtr DecodeRecPtr; /* start of last record decoded */ + XLogRecPtr NextRecPtr; /* end+1 of last record decoded */ + XLogRecPtr PrevRecPtr; /* start of previous record decoded */ - char *main_data; /* record's main data portion */ - uint32 main_data_len; /* main data portion's length */ - uint32 main_data_bufsz; /* allocated size of the buffer */ - - RepOriginId record_origin; - - TransactionId toplevel_xid; /* XID of top-level transaction */ - - /* information about blocks referenced by the record. */ - DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; - - int max_block_id; /* highest block_id in use (-1 if none) */ + /* Last record returned by XLogReadRecord(). */ + DecodedXLogRecord *record; /* ---------------------------------------- * private/internal state * ---------------------------------------- */ + /* + * Buffer for decoded records. This is a circular buffer, though + * individual records can't be split in the middle, so some space is often + * wasted at the end. Oversized records that don't fit in this space are + * allocated separately. + */ + char *decode_buffer; + size_t decode_buffer_size; + bool free_decode_buffer; /* need to free? */ + char *decode_buffer_head; /* data is read from the head */ + char *decode_buffer_tail; /* new data is written at the tail */ + + /* + * Queue of records that have been decoded. This is a linked list that + * usually consists of consecutive records in decode_buffer, but may also + * contain oversized records allocated with palloc(). + */ + DecodedXLogRecord *decode_queue_head; /* oldest decoded record */ + DecodedXLogRecord *decode_queue_tail; /* newest decoded record */ + /* * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least * readLen bytes) @@ -262,8 +305,24 @@ struct XLogReaderState /* Buffer to hold error message */ char *errormsg_buf; + bool errormsg_deferred; + + /* + * Flag to indicate to XLogPageReadCB that it should not block waiting for + * data. + */ + bool nonblocking; }; +/* + * Check if XLogNextRecord() has any more queued records or an error to return. + */ +static inline bool +XLogReaderHasQueuedRecordOrError(XLogReaderState *state) +{ + return (state->decode_queue_head != NULL) || state->errormsg_deferred; +} + /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, @@ -274,16 +333,40 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* Optionally provide a circular decoding buffer to allow readahead. */ +extern void XLogReaderSetDecodeBuffer(XLogReaderState *state, + void *buffer, + size_t size); + /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* Return values from XLogPageReadCB. */ +typedef enum XLogPageReadResult +{ + XLREAD_SUCCESS = 0, /* record is successfully read */ + XLREAD_FAIL = -1, /* failed during reading a record */ + XLREAD_WOULDBLOCK = -2 /* nonblocking mode only, no data */ +} XLogPageReadResult; + /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, char **errormsg); +/* Consume the next record or error. */ +extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, + char **errormsg); + +/* Release the previously returned record, if necessary. */ +extern void XLogReleasePreviousRecord(XLogReaderState *state); + +/* Try to read ahead, if there is data and space. */ +extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state, + bool nonblocking); + /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); @@ -307,25 +390,36 @@ extern bool WALRead(XLogReaderState *state, /* Functions for decoding an XLogRecord */ -extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, +extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +extern bool DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, char **errmsg); -#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) -#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) -#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) -#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) -#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) -#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) -#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) -#define XLogRecGetData(decoder) ((decoder)->main_data) -#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) -#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) -#define XLogRecHasBlockRef(decoder, block_id) \ - ((decoder)->blocks[block_id].in_use) -#define XLogRecHasBlockImage(decoder, block_id) \ - ((decoder)->blocks[block_id].has_image) -#define XLogRecBlockImageApply(decoder, block_id) \ - ((decoder)->blocks[block_id].apply_image) +/* + * Macros that provide access to parts of the record most recently returned by + * XLogReadRecord() or XLogNextRecord(). + */ +#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len) +#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev) +#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info) +#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid) +#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid) +#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid) +#define XLogRecGetData(decoder) ((decoder)->record->main_data) +#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len) +#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0) +#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id) +#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)]) +#define XLogRecHasBlockRef(decoder, block_id) \ + (((decoder)->record->max_block_id >= (block_id)) && \ + ((decoder)->record->blocks[block_id].in_use)) +#define XLogRecHasBlockImage(decoder, block_id) \ + ((decoder)->record->blocks[block_id].has_image) +#define XLogRecBlockImageApply(decoder, block_id) \ + ((decoder)->record->blocks[block_id].apply_image) #ifndef FRONTEND extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6ffd4474bc..d8e228d89a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -533,6 +533,7 @@ DeadLockState DeallocateStmt DeclareCursorStmt DecodedBkpBlock +DecodedXLogRecord DecodingOutputState DefElem DefElemAction @@ -2941,6 +2942,7 @@ XLogPageHeader XLogPageHeaderData XLogPageReadCB XLogPageReadPrivate +XLogPageReadResult XLogReaderRoutine XLogReaderState XLogRecData