diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6f7ee0c947..5adf956f41 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1338,7 +1338,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errmsg("out of memory"), errdetail("Failed while allocating a WAL reading processor."))); - record = XLogReadRecord(xlogreader, lsn, &errormsg); + XLogBeginRead(xlogreader, lsn); + record = XLogReadRecord(xlogreader, &errormsg); if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7f4f784c0e..882d5e8a73 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -897,7 +897,7 @@ static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); -static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, +static XLogRecord *ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt); static void CheckRecoveryConsistency(void); static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, @@ -4246,17 +4246,17 @@ CleanupBackupHistory(void) } /* - * Attempt to read an XLOG record. + * Attempt to read the next XLOG record. * - * If RecPtr is valid, try to read a record at that position. Otherwise - * try to read a record just after the last one previously read. + * Before first call, the reader needs to be positioned to the first record + * by calling XLogBeginRead(). * * If no valid record is available, returns NULL, or fails if emode is PANIC. * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ static XLogRecord * -ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, +ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; @@ -4265,7 +4265,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; private->emode = emode; - private->randAccess = (RecPtr != InvalidXLogRecPtr); + private->randAccess = (xlogreader->ReadRecPtr != InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4274,7 +4274,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, { char *errormsg; - record = XLogReadRecord(xlogreader, RecPtr, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -4292,8 +4292,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, * shouldn't loop anymore in that case. */ if (errormsg) - ereport(emode_for_corrupt_record(emode, - RecPtr ? RecPtr : EndRecPtr), + ereport(emode_for_corrupt_record(emode, EndRecPtr), (errmsg_internal("%s", errormsg) /* already translated */ )); } @@ -4311,8 +4310,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, wal_segment_size); XLogFileName(fname, xlogreader->seg.ws_tli, segno, wal_segment_size); - ereport(emode_for_corrupt_record(emode, - RecPtr ? RecPtr : EndRecPtr), + ereport(emode_for_corrupt_record(emode, EndRecPtr), (errmsg("unexpected timeline ID %u in log segment %s, offset %u", xlogreader->latestPageTLI, fname, @@ -6427,7 +6425,8 @@ StartupXLOG(void) */ if (checkPoint.redo < checkPointLoc) { - if (!ReadRecord(xlogreader, checkPoint.redo, LOG, false)) + XLogBeginRead(xlogreader, checkPoint.redo); + if (!ReadRecord(xlogreader, LOG, false)) ereport(FATAL, (errmsg("could not find redo location referenced by checkpoint record"), errhint("If you are restoring from a backup, touch \"%s/recovery.signal\" and add required recovery options.\n" @@ -7034,12 +7033,13 @@ StartupXLOG(void) if (checkPoint.redo < RecPtr) { /* back up to find the record */ - record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); + XLogBeginRead(xlogreader, checkPoint.redo); + record = ReadRecord(xlogreader, PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + record = ReadRecord(xlogreader, LOG, false); } if (record != NULL) @@ -7263,7 +7263,7 @@ StartupXLOG(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); /* @@ -7365,7 +7365,8 @@ StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ - record = ReadRecord(xlogreader, LastRec, PANIC, false); + XLogBeginRead(xlogreader, LastRec); + record = ReadRecord(xlogreader, PANIC, false); EndOfLog = EndRecPtr; /* @@ -8094,7 +8095,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, return NULL; } - record = ReadRecord(xlogreader, RecPtr, LOG, true); + XLogBeginRead(xlogreader, RecPtr); + record = ReadRecord(xlogreader, LOG, true); if (record == NULL) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3aa68127a3..32f02256ed 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -218,11 +218,34 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir); } +/* + * Begin reading WAL at 'RecPtr'. + * + * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at + * the beginning of a page is also OK, if there is a new record right after + * the page header, i.e. not a continuation. + * + * This does not make any attempt to read the WAL yet, and hence cannot fail. + * If the starting address is not correct, the first call to XLogReadRecord() + * will error out. + */ +void +XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) +{ + Assert(!XLogRecPtrIsInvalid(RecPtr)); + + ResetDecoder(state); + + /* Begin at the passed-in record pointer. */ + state->EndRecPtr = RecPtr; + state->ReadRecPtr = InvalidXLogRecPtr; +} + /* * Attempt to read an XLOG record. * - * If RecPtr is valid, try to read a record at that position. Otherwise - * try to read a record just after the last one previously read. + * XLogBeginRead() or XLogFindNextRecord() must be called before the first call + * to XLogReadRecord(). * * If the read_page callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg @@ -235,8 +258,9 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, * valid until the next call to XLogReadRecord. */ XLogRecord * -XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) +XLogReadRecord(XLogReaderState *state, char **errormsg) { + XLogRecPtr RecPtr; XLogRecord *record; XLogRecPtr targetPagePtr; bool randAccess; @@ -260,19 +284,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) ResetDecoder(state); - if (RecPtr == InvalidXLogRecPtr) - { - /* No explicit start point; read the record after the one we just read */ - RecPtr = state->EndRecPtr; + RecPtr = state->EndRecPtr; - if (state->ReadRecPtr == InvalidXLogRecPtr) - randAccess = true; + if (state->ReadRecPtr != InvalidXLogRecPtr) + { + /* read the record after the one we just read */ /* - * RecPtr is pointing to end+1 of the previous WAL record. If we're - * at a page boundary, no more records can fit on the current page. We - * must skip over the page header, but we can't do that until we've - * read in the page, since the header size is variable. + * EndRecPtr is pointing to end+1 of the previous WAL record. If + * we're at a page boundary, no more records can fit on the current + * page. We must skip over the page header, but we can't do that until + * we've read in the page, since the header size is variable. */ } else @@ -280,8 +302,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) /* * Caller supplied a position to start at. * - * In this case, the passed-in record pointer should already be - * pointing to a valid record starting position. + * In this case, EndRecPtr should already be pointing to a valid + * record starting position. */ Assert(XRecOffIsValid(RecPtr)); randAccess = true; @@ -899,14 +921,17 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, /* * Find the first record with an lsn >= RecPtr. * - * Useful for checking whether RecPtr is a valid xlog address for reading, and - * to find the first valid address after some address when dumping records for - * debugging purposes. + * This is different from XLogBeginRead() in that RecPtr doesn't need to point + * to a valid record boundary. Useful for checking whether RecPtr is a valid + * xlog address for reading, and to find the first valid address after some + * address when dumping records for debugging purposes. + * + * This positions the reader, like XLogBeginRead(), so that the next call to + * XLogReadRecord() will read the next valid record. */ XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) { - XLogReaderState saved_state = *state; XLogRecPtr tmpRecPtr; XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; @@ -991,27 +1016,23 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL) + XLogBeginRead(state, tmpRecPtr); + while (XLogReadRecord(state, &errormsg) != NULL) { - /* continue after the record */ - tmpRecPtr = InvalidXLogRecPtr; - /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) { + /* Rewind the reader to the beginning of the last record. */ found = state->ReadRecPtr; - goto out; + XLogBeginRead(state, found); + return found; } } err: -out: - /* Reset state to what we had before finding the record */ - state->ReadRecPtr = saved_state.ReadRecPtr; - state->EndRecPtr = saved_state.EndRecPtr; XLogReaderInvalReadState(state); - return found; + return InvalidXLogRecPtr; } #endif /* FRONTEND */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index bdf4389a57..cf93200618 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -461,11 +461,10 @@ DecodingContextReady(LogicalDecodingContext *ctx) void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { - XLogRecPtr startptr; ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = slot->data.restart_lsn; + XLogBeginRead(ctx->reader, slot->data.restart_lsn); elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", (uint32) (slot->data.restart_lsn >> 32), @@ -478,14 +477,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, startptr, &err); + record = XLogReadRecord(ctx->reader, &err); if (err) elog(ERROR, "%s", err); if (!record) elog(ERROR, "no record found"); /* shouldn't happen */ - startptr = InvalidXLogRecPtr; - LogicalDecodingProcessRecord(ctx, ctx->reader); /* only continue till we found a consistent spot */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 7693c98949..25b89e5616 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -127,7 +127,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; - XLogRecPtr startptr; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -269,28 +268,21 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * xacts that committed after the slot's confirmed_flush can be * accumulated into reorder buffers. */ - startptr = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); /* invalidate non-timetravel entries */ InvalidateSystemCaches(); /* Decode until we run out of records */ - while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) + while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, startptr, &errm); + record = XLogReadRecord(ctx->reader, &errm); if (errm) elog(ERROR, "%s", errm); - /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ - startptr = InvalidXLogRecPtr; - /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will * store the description into our tuplestore. diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index bb69683e2a..7c89694611 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -391,7 +391,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr startlsn; XLogRecPtr retlsn; PG_TRY(); @@ -411,7 +410,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Start reading at the slot's restart_lsn, which we know to point to * a valid record. */ - startlsn = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); /* Initialize our return value in case we don't do anything */ retlsn = MyReplicationSlot->data.confirmed_flush; @@ -420,10 +419,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) InvalidateSystemCaches(); /* Decode at least one record, until we run out of records */ - while ((!XLogRecPtrIsInvalid(startlsn) && - startlsn < moveto) || - (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && - ctx->reader->EndRecPtr < moveto)) + while (ctx->reader->EndRecPtr < moveto) { char *errm = NULL; XLogRecord *record; @@ -432,13 +428,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, startlsn, &errm); + record = XLogReadRecord(ctx->reader, &errm); if (errm) elog(ERROR, "%s", errm); - /* Read sequentially from now on */ - startlsn = InvalidXLogRecPtr; - /* * Process the record. Storage-level changes are ignored in * fast_forward mode, but other modules (such as snapbuilder) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9c063749b6..0c65f1660b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -191,7 +191,6 @@ static volatile sig_atomic_t got_STOPPING = false; static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; -static XLogRecPtr logical_startptr = InvalidXLogRecPtr; /* A sample associating a WAL location with the time it was written. */ typedef struct @@ -1130,9 +1129,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(logical_decoding_ctx->reader, + MyReplicationSlot->data.restart_lsn); /* * Report the location after which we'll send out further commits as the @@ -2791,8 +2790,7 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); - logical_startptr = InvalidXLogRecPtr; + record = XLogReadRecord(logical_decoding_ctx->reader, &errm); /* xlog record was invalid */ if (errm != NULL) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index b6429827cf..eb61cb8803 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -68,15 +68,14 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, if (xlogreader == NULL) pg_fatal("out of memory"); + XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, startpoint, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg); if (record == NULL) { - XLogRecPtr errptr; - - errptr = startpoint ? startpoint : xlogreader->EndRecPtr; + XLogRecPtr errptr = xlogreader->EndRecPtr; if (errormsg) pg_fatal("could not read WAL record at %X/%X: %s", @@ -89,8 +88,6 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, extractPageInfo(xlogreader); - startpoint = InvalidXLogRecPtr; /* continue reading at next record */ - } while (xlogreader->ReadRecPtr != endpoint); XLogReaderFree(xlogreader); @@ -120,7 +117,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex) if (xlogreader == NULL) pg_fatal("out of memory"); - record = XLogReadRecord(xlogreader, ptr, &errormsg); + XLogBeginRead(xlogreader, ptr); + record = XLogReadRecord(xlogreader, &errormsg); if (record == NULL) { if (errormsg) @@ -182,7 +180,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, { uint8 info; - record = XLogReadRecord(xlogreader, searchptr, &errormsg); + XLogBeginRead(xlogreader, searchptr); + record = XLogReadRecord(xlogreader, &errormsg); if (record == NULL) { diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 23f3518c2e..83202b5b87 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -1053,7 +1053,7 @@ main(int argc, char **argv) for (;;) { /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, first_record, &errormsg); + record = XLogReadRecord(xlogreader_state, &errormsg); if (!record) { if (!config.follow || private.endptr_reached) @@ -1065,9 +1065,6 @@ main(int argc, char **argv) } } - /* after reading the first record, continue at next one */ - first_record = InvalidXLogRecPtr; - /* apply all specified filters */ if (config.filter_by_rmgr != -1 && config.filter_by_rmgr != record->xl_rmid) diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index f7cc8c4e1d..4582196e18 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -13,7 +13,9 @@ * how to use the XLogReader infrastructure. * * The basic idea is to allocate an XLogReaderState via - * XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL. + * XLogReaderAllocate(), position the reader to the first record with + * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord() + * until it returns NULL. * * After reading a record with XLogReadRecord(), it's decomposed into * the per-block and main data parts, and the parts can be accessed @@ -126,7 +128,8 @@ struct XLogReaderState /* * Start and end point of last record read. EndRecPtr is also used as the - * position to read next, if XLogReadRecord receives an invalid recptr. + * position to read next. Calling XLogBeginRead() sets EndRecPtr to the + * starting position and ReadRecPtr to invalid. */ XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ @@ -239,18 +242,20 @@ typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); +/* Position the XLogReader to given record */ +extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); +#ifdef FRONTEND +extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); +#endif /* FRONTEND */ + /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - XLogRecPtr recptr, char **errormsg); + char **errormsg); /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); -#ifdef FRONTEND -extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); -#endif /* FRONTEND */ - /* * Error information from WALRead that both backend and frontend caller can * process. Currently only errors from pg_pread can be reported.