diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 7cbce3783b..0454f79067 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -138,6 +138,16 @@ xlog_desc(StringInfo buf, XLogReaderState *record) xlrec.ThisTimeLineID, xlrec.PrevTimeLineID, timestamptz_to_str(xlrec.end_time)); } + else if (info == XLOG_OVERWRITE_CONTRECORD) + { + xl_overwrite_contrecord xlrec; + + memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord)); + appendStringInfo(buf, "lsn %X/%X; time %s", + (uint32) (xlrec.overwritten_lsn >> 32), + (uint32) xlrec.overwritten_lsn, + timestamptz_to_str(xlrec.overwrite_time)); + } } const char * @@ -177,6 +187,9 @@ xlog_identify(uint8 info) case XLOG_END_OF_RECOVERY: id = "END_OF_RECOVERY"; break; + case XLOG_OVERWRITE_CONTRECORD: + id = "OVERWRITE_CONTRECORD"; + break; case XLOG_FPI: id = "FPI"; break; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c0bb2f5c89..290278e0d3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -202,6 +202,15 @@ static XLogRecPtr LastRec; static XLogRecPtr receivedUpto = 0; static TimeLineID receiveTLI = 0; +/* + * abortedRecPtr is the start pointer of a broken record at end of WAL when + * recovery completes; missingContrecPtr is the location of the first + * contrecord that went missing. See CreateOverwriteContrecordRecord for + * details. + */ +static XLogRecPtr abortedRecPtr; +static XLogRecPtr missingContrecPtr; + /* * During recovery, lastFullPageWrites keeps track of full_page_writes that * the replayed WAL records indicate. It's initialized with full_page_writes @@ -868,8 +877,11 @@ static void CheckRequiredParameterValues(void); static void XLogReportParameters(void); static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI); +static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, + XLogReaderState *state); static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); +static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -2208,6 +2220,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) if (!Insert->forcePageWrites) NewPage->xlp_info |= XLP_BKP_REMOVABLE; + /* + * If a record was found to be broken at the end of recovery, and + * we're going to write on the page where its first contrecord was + * lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page + * header. See CreateOverwriteContrecordRecord(). + */ + if (missingContrecPtr == NewPageBeginPtr) + { + NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD; + missingContrecPtr = InvalidXLogRecPtr; + } + /* * If first page of an XLOG segment file, make it a long header. */ @@ -4244,6 +4268,19 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) { + /* + * When not in standby mode we find that WAL ends in an incomplete + * record, keep track of that record. After recovery is done, + * we'll write a record to indicate downstream WAL readers that + * that portion is to be ignored. + */ + if (!StandbyMode && + !XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)) + { + abortedRecPtr = xlogreader->abortedRecPtr; + missingContrecPtr = xlogreader->missingContrecPtr; + } + if (readFile >= 0) { close(readFile); @@ -6892,6 +6929,12 @@ StartupXLOG(void) InRecovery = true; } + /* + * Start recovery assuming that the final record isn't lost. + */ + abortedRecPtr = InvalidXLogRecPtr; + missingContrecPtr = InvalidXLogRecPtr; + /* REDO */ if (InRecovery) { @@ -7482,8 +7525,9 @@ StartupXLOG(void) /* * Kill WAL receiver, if it's still running, before we continue to write - * the startup checkpoint record. It will trump over the checkpoint and - * subsequent records if it's still alive when we start writing WAL. + * the startup checkpoint and aborted-contrecord records. It will trump + * over these records and subsequent ones if it's still alive when we + * start writing WAL. */ ShutdownWalRcv(); @@ -7516,8 +7560,12 @@ StartupXLOG(void) StandbyMode = false; /* - * Re-fetch the last valid or last applied record, so we can identify the - * exact endpoint of what we consider the valid portion of WAL. + * Determine where to start writing WAL next. + * + * When recovery ended in an incomplete record, write a WAL record about + * that and continue after it. In all other cases, re-fetch the last + * valid or last applied record, so we can identify the exact endpoint of + * what we consider the valid portion of WAL. */ record = ReadRecord(xlogreader, LastRec, PANIC, false); EndOfLog = EndRecPtr; @@ -7666,6 +7714,18 @@ StartupXLOG(void) XLogCtl->ThisTimeLineID = ThisTimeLineID; XLogCtl->PrevTimeLineID = PrevTimeLineID; + /* + * Actually, if WAL ended in an incomplete record, skip the parts that + * made it through and start writing after the portion that persisted. + * (It's critical to first write an OVERWRITE_CONTRECORD message, which + * we'll do as soon as we're open for writing new WAL.) + */ + if (!XLogRecPtrIsInvalid(missingContrecPtr)) + { + Assert(!XLogRecPtrIsInvalid(abortedRecPtr)); + EndOfLog = missingContrecPtr; + } + /* * Prepare to write WAL starting at EndOfLog location, and init xlog * buffer cache using the block containing the last record from the @@ -7718,13 +7778,23 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; + LocalSetXLogInsertAllowed(); + + /* If necessary, write overwrite-contrecord before doing anything else */ + if (!XLogRecPtrIsInvalid(abortedRecPtr)) + { + Assert(!XLogRecPtrIsInvalid(missingContrecPtr)); + CreateOverwriteContrecordRecord(abortedRecPtr); + abortedRecPtr = InvalidXLogRecPtr; + missingContrecPtr = InvalidXLogRecPtr; + } + /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE * record before resource manager writes cleanup WAL records or checkpoint * record is written. */ Insert->fullPageWrites = lastFullPageWrites; - LocalSetXLogInsertAllowed(); UpdateFullPageWrites(); LocalXLogInsertAllowed = -1; @@ -9192,6 +9262,53 @@ CreateEndOfRecoveryRecord(void) LocalXLogInsertAllowed = -1; /* return to "check" state */ } +/* + * Write an OVERWRITE_CONTRECORD message. + * + * When on WAL replay we expect a continuation record at the start of a page + * that is not there, recovery ends and WAL writing resumes at that point. + * But it's wrong to resume writing new WAL back at the start of the record + * that was broken, because downstream consumers of that WAL (physical + * replicas) are not prepared to "rewind". So the first action after + * finishing replay of all valid WAL must be to write a record of this type + * at the point where the contrecord was missing; to support xlogreader + * detecting the special case, XLP_FIRST_IS_OVERWRITE_CONTRECORD is also added + * to the page header where the record occurs. xlogreader has an ad-hoc + * mechanism to report metadata about the broken record, which is what we + * use here. + * + * At replay time, XLP_FIRST_IS_OVERWRITE_CONTRECORD instructs xlogreader to + * skip the record it was reading, and pass back the LSN of the skipped + * record, so that its caller can verify (on "replay" of that record) that the + * XLOG_OVERWRITE_CONTRECORD matches what was effectively overwritten. + */ +static XLogRecPtr +CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn) +{ + xl_overwrite_contrecord xlrec; + XLogRecPtr recptr; + + /* sanity check */ + if (!RecoveryInProgress()) + elog(ERROR, "can only be used at end of recovery"); + + xlrec.overwritten_lsn = aborted_lsn; + xlrec.overwrite_time = GetCurrentTimestamp(); + + START_CRIT_SECTION(); + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord)); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD); + + XLogFlush(recptr); + + END_CRIT_SECTION(); + + return recptr; +} + /* * Flush all data in shared memory to disk, and fsync * @@ -9992,6 +10109,13 @@ xlog_redo(XLogReaderState *record) RecoveryRestartPoint(&checkPoint); } + else if (info == XLOG_OVERWRITE_CONTRECORD) + { + xl_overwrite_contrecord xlrec; + + memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord)); + VerifyOverwriteContrecord(&xlrec, record); + } else if (info == XLOG_END_OF_RECOVERY) { xl_end_of_recovery xlrec; @@ -10154,6 +10278,29 @@ xlog_redo(XLogReaderState *record) } } +/* + * Verify the payload of a XLOG_OVERWRITE_CONTRECORD record. + */ +static void +VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state) +{ + if (xlrec->overwritten_lsn != state->overwrittenRecPtr) + elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X", + (uint32) (xlrec->overwritten_lsn >> 32), + (uint32) xlrec->overwritten_lsn, + (uint32) (state->overwrittenRecPtr >> 32), + (uint32) state->overwrittenRecPtr); + + ereport(LOG, + (errmsg("sucessfully skipped missing contrecord at %X/%X, overwritten at %s", + (uint32) (xlrec->overwritten_lsn >> 32), + (uint32) xlrec->overwritten_lsn, + timestamptz_to_str(xlrec->overwrite_time)))); + + /* Verifying the record should only happen once */ + state->overwrittenRecPtr = InvalidXLogRecPtr; +} + #ifdef WAL_DEBUG static void diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f2cf3170e4..8e78013b7a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -222,6 +222,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) total_len; uint32 targetRecOff; uint32 pageHeaderSize; + bool assembled; bool gotheader; int readOff; @@ -237,6 +238,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) state->errormsg_buf[0] = '\0'; ResetDecoder(state); + state->abortedRecPtr = InvalidXLogRecPtr; + state->missingContrecPtr = InvalidXLogRecPtr; if (RecPtr == InvalidXLogRecPtr) { @@ -265,7 +268,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) randAccess = true; } +restart: state->currRecPtr = RecPtr; + assembled = false; targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); targetRecOff = RecPtr % XLOG_BLCKSZ; @@ -375,6 +380,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) char *buffer; uint32 gotlen; + assembled = true; /* Copy the first fragment of the record from the first page. */ memcpy(state->readRecordBuf, state->readBuf + RecPtr % XLOG_BLCKSZ, len); @@ -396,8 +402,25 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) Assert(SizeOfXLogShortPHD <= readOff); - /* Check that the continuation on next page looks valid */ pageHeader = (XLogPageHeader) state->readBuf; + + /* + * If we were expecting a continuation record and got an + * "overwrite contrecord" flag, that means the continuation record + * was overwritten with a different record. Restart the read by + * assuming the address to read is the location where we found + * this flag; but keep track of the LSN of the record we were + * reading, for later verification. + */ + if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) + { + state->overwrittenRecPtr = state->currRecPtr; + ResetDecoder(state); + RecPtr = targetPagePtr; + goto restart; + } + + /* Check that the continuation on next page looks valid */ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, @@ -499,6 +522,20 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) return NULL; err: + if (assembled) + { + /* + * We get here when a record that spans multiple pages needs to be + * assembled, but something went wrong -- perhaps a contrecord piece + * was lost. If caller is WAL replay, it will know where the aborted + * record was and where to direct followup WAL to be written, marking + * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will + * in turn signal downstream WAL consumers that the broken WAL record + * is to be ignored. + */ + state->abortedRecPtr = RecPtr; + state->missingContrecPtr = targetPagePtr; + } /* * Invalidate the read state. We might read from a different source after diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 30610b3ea9..59e508d0e9 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -79,8 +79,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; #define XLP_LONG_HEADER 0x0002 /* This flag indicates backup blocks starting in this page are optional */ #define XLP_BKP_REMOVABLE 0x0004 +/* Replaces a missing contrecord; see CreateOverwriteContrecordRecord */ +#define XLP_FIRST_IS_OVERWRITE_CONTRECORD 0x0008 /* All defined flag bits in xlp_info (used for validity checking of header) */ -#define XLP_ALL_FLAGS 0x0007 +#define XLP_ALL_FLAGS 0x000F #define XLogPageHeaderSize(hdr) \ (((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD) @@ -240,6 +242,13 @@ typedef struct xl_restore_point char rp_name[MAXFNAMELEN]; } xl_restore_point; +/* Overwrite of prior contrecord */ +typedef struct xl_overwrite_contrecord +{ + XLogRecPtr overwritten_lsn; + TimestampTz overwrite_time; +} xl_overwrite_contrecord; + /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */ typedef struct xl_end_of_recovery { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 40116f8ecb..084463ccc3 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -191,6 +191,16 @@ struct XLogReaderState /* Buffer to hold error message */ char *errormsg_buf; + + /* + * Set at the end of recovery: the start point of a partial record at the + * end of WAL (InvalidXLogRecPtr if there wasn't one), and the start + * location of its first contrecord that went missing. + */ + XLogRecPtr abortedRecPtr; + XLogRecPtr missingContrecPtr; + /* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */ + XLogRecPtr overwrittenRecPtr; }; /* Get a new XLogReader */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 358430249f..6f73784404 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -77,6 +77,7 @@ typedef struct CheckPoint #define XLOG_FPI_FOR_HINT 0xA0 #define XLOG_FPI 0xB0 #define XLOG_FPI_MULTI 0xC0 +#define XLOG_OVERWRITE_CONTRECORD 0xD0 /* diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl new file mode 100644 index 0000000000..b990cdbe3e --- /dev/null +++ b/src/test/recovery/t/026_overwrite_contrecord.pl @@ -0,0 +1,96 @@ +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for already-propagated WAL segments ending in incomplete WAL records. + +use strict; +use warnings; + +use FindBin; +use PostgresNode; +use TestLib; +use Test::More; + +plan tests => 3; + +# Test: Create a physical replica that's missing the last WAL file, +# then restart the primary to create a divergent WAL file and observe +# that the replica replays the "overwrite contrecord" from that new +# file. + +my $node = PostgresNode->get_new_node('primary'); +$node->init(allows_streaming => 1); +$node->append_conf('postgresql.conf', 'wal_keep_segments=16'); +$node->start; + +$node->safe_psql('postgres', 'create table filler (a int)'); +# First, measure how many bytes does the insertion of 1000 rows produce +my $start_lsn = + $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'}); +$node->safe_psql('postgres', + 'insert into filler select * from generate_series(1, 1000)'); +my $end_lsn = + $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'}); +my $rows_walsize = $end_lsn - $start_lsn; + +# Now consume all remaining room in the current WAL segment, leaving +# space enough only for the start of a largish record. +$node->safe_psql( + 'postgres', qq{ +WITH setting AS ( + SELECT setting::int AS wal_segsize + FROM pg_settings WHERE name = 'wal_segment_size' +) +INSERT INTO filler +SELECT g FROM setting, + generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g +}); + +my $initfile = $node->safe_psql('postgres', + 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())'); +$node->safe_psql('postgres', + qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))} +); +#$node->safe_psql('postgres', qq{create table foo ()}); +my $endfile = $node->safe_psql('postgres', + 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())'); +ok($initfile != $endfile, "$initfile differs from $endfile"); + +# Now stop abruptly, to avoid a stop checkpoint. We can remove the tail file +# afterwards, and on startup the large message should be overwritten with new +# contents +$node->stop('immediate'); + +unlink $node->basedir . "/pgdata/pg_wal/$endfile" + or die "could not unlink " . $node->basedir . "/pgdata/pg_wal/$endfile: $!"; + +# OK, create a standby at this spot. +$node->backup_fs_cold('backup'); +my $node_standby = PostgresNode->get_new_node('standby'); +$node_standby->init_from_backup($node, 'backup', has_streaming => 1); + +$node_standby->start; +$node->start; + +$node->safe_psql('postgres', + qq{create table foo (a text); insert into foo values ('hello')}); +$node->safe_psql('postgres', + qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')}); + +my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); +my $caughtup_query = + "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()"; +$node_standby->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for standby to catch up"; + +ok($node_standby->safe_psql('postgres', 'select * from foo') eq 'hello', + 'standby replays past overwritten contrecord'); + +# Verify message appears in standby's log +my $log = slurp_file($node_standby->logfile); +like( + $log, + qr[sucessfully skipped missing contrecord at], + "found log line in standby"); + +$node->stop; +$node_standby->stop; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a49e779fcf..934e8b36f7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3302,6 +3302,7 @@ xl_logical_message xl_multi_insert_tuple xl_multixact_create xl_multixact_truncate +xl_overwrite_contrecord xl_parameter_change xl_relmap_update xl_replorigin_drop