Fix WAL replay in presence of an incomplete record

Physical replication always ships WAL segment files to replicas once
they are complete.  This is a problem if one WAL record is split across
a segment boundary and the primary server crashes before writing down
the segment with the next portion of the WAL record: WAL writing after
crash recovery would happily resume at the point where the broken record
started, overwriting that record ... but any standby or backup may have
already received a copy of that segment, and they are not rewinding.
This causes standbys to stop following the primary after the latter
crashes:
  LOG:  invalid contrecord length 7262 at A8/D9FFFBC8
because the standby is still trying to read the continuation record
(contrecord) for the original long WAL record, but it is not there and
it will never be.  A workaround is to stop the replica, delete the WAL
file, and restart it -- at which point a fresh copy is brought over from
the primary.  But that's pretty labor intensive, and I bet many users
would just give up and re-clone the standby instead.

A fix for this problem was already attempted in commit 515e3d84a0, but
it only addressed the case for the scenario of WAL archiving, so
streaming replication would still be a problem (as well as other things
such as taking a filesystem-level backup while the server is down after
having crashed), and it had performance scalability problems too; so it
had to be reverted.

This commit fixes the problem using an approach suggested by Andres
Freund, whereby the initial portion(s) of the split-up WAL record are
kept, and a special type of WAL record is written where the contrecord
was lost, so that WAL replay in the replica knows to skip the broken
parts.  With this approach, we can continue to stream/archive segment
files as soon as they are complete, and replay of the broken records
will proceed across the crash point without a hitch.

Because a new type of WAL record is added, users should be careful to
upgrade standbys first, primaries later. Otherwise they risk the standby
being unable to start if the primary happens to write such a record.

A new TAP test that exercises this is added, but the portability of it
is yet to be seen.

This has been wrong since the introduction of physical replication, so
backpatch all the way back.  In stable branches, keep the new
XLogReaderState members at the end of the struct, to avoid an ABI
break.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Nathan Bossart <bossartn@amazon.com>
Discussion: https://postgr.es/m/202108232252.dh7uxf6oxwcy@alvherre.pgsql
This commit is contained in:
Alvaro Herrera 2021-09-29 11:21:51 -03:00
parent 55367378d7
commit 148c6ee3be
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
8 changed files with 342 additions and 8 deletions

View File

@ -138,6 +138,16 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
timestamptz_to_str(xlrec.end_time));
}
else if (info == XLOG_OVERWRITE_CONTRECORD)
{
xl_overwrite_contrecord xlrec;
memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
appendStringInfo(buf, "lsn %X/%X; time %s",
(uint32) (xlrec.overwritten_lsn >> 32),
(uint32) xlrec.overwritten_lsn,
timestamptz_to_str(xlrec.overwrite_time));
}
}
const char *
@ -177,6 +187,9 @@ xlog_identify(uint8 info)
case XLOG_END_OF_RECOVERY:
id = "END_OF_RECOVERY";
break;
case XLOG_OVERWRITE_CONTRECORD:
id = "OVERWRITE_CONTRECORD";
break;
case XLOG_FPI:
id = "FPI";
break;

View File

@ -195,6 +195,15 @@ static XLogRecPtr LastRec;
static XLogRecPtr receivedUpto = 0;
static TimeLineID receiveTLI = 0;
/*
* abortedRecPtr is the start pointer of a broken record at end of WAL when
* recovery completes; missingContrecPtr is the location of the first
* contrecord that went missing. See CreateOverwriteContrecordRecord for
* details.
*/
static XLogRecPtr abortedRecPtr;
static XLogRecPtr missingContrecPtr;
/*
* During recovery, lastFullPageWrites keeps track of full_page_writes that
* the replayed WAL records indicate. It's initialized with full_page_writes
@ -838,8 +847,11 @@ static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
TimeLineID prevTLI);
static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
XLogReaderState *state);
static void LocalSetXLogInsertAllowed(void);
static void CreateEndOfRecoveryRecord(void);
static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@ -2029,6 +2041,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
if (!Insert->forcePageWrites)
NewPage->xlp_info |= XLP_BKP_REMOVABLE;
/*
* If a record was found to be broken at the end of recovery, and
* we're going to write on the page where its first contrecord was
* lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page
* header. See CreateOverwriteContrecordRecord().
*/
if (missingContrecPtr == NewPageBeginPtr)
{
NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
missingContrecPtr = InvalidXLogRecPtr;
}
/*
* If first page of an XLOG segment file, make it a long header.
*/
@ -4073,6 +4097,19 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
{
/*
* When not in standby mode we find that WAL ends in an incomplete
* record, keep track of that record. After recovery is done,
* we'll write a record to indicate downstream WAL readers that
* that portion is to be ignored.
*/
if (!StandbyMode &&
!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
{
abortedRecPtr = xlogreader->abortedRecPtr;
missingContrecPtr = xlogreader->missingContrecPtr;
}
if (readFile >= 0)
{
close(readFile);
@ -6571,6 +6608,12 @@ StartupXLOG(void)
InRecovery = true;
}
/*
* Start recovery assuming that the final record isn't lost.
*/
abortedRecPtr = InvalidXLogRecPtr;
missingContrecPtr = InvalidXLogRecPtr;
/* REDO */
if (InRecovery)
{
@ -7153,8 +7196,9 @@ StartupXLOG(void)
/*
* Kill WAL receiver, if it's still running, before we continue to write
* the startup checkpoint record. It will trump over the checkpoint and
* subsequent records if it's still alive when we start writing WAL.
* the startup checkpoint and aborted-contrecord records. It will trump
* over these records and subsequent ones if it's still alive when we
* start writing WAL.
*/
ShutdownWalRcv();
@ -7187,8 +7231,12 @@ StartupXLOG(void)
StandbyMode = false;
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
* Determine where to start writing WAL next.
*
* When recovery ended in an incomplete record, write a WAL record about
* that and continue after it. In all other cases, re-fetch the last
* valid or last applied record, so we can identify the exact endpoint of
* what we consider the valid portion of WAL.
*/
record = ReadRecord(xlogreader, LastRec, PANIC, false);
EndOfLog = EndRecPtr;
@ -7332,7 +7380,19 @@ StartupXLOG(void)
XLogCtl->PrevTimeLineID = PrevTimeLineID;
/*
* Prepare to write WAL starting at EndOfLog position, and init xlog
* Actually, if WAL ended in an incomplete record, skip the parts that
* made it through and start writing after the portion that persisted.
* (It's critical to first write an OVERWRITE_CONTRECORD message, which
* we'll do as soon as we're open for writing new WAL.)
*/
if (!XLogRecPtrIsInvalid(missingContrecPtr))
{
Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
EndOfLog = missingContrecPtr;
}
/*
* Prepare to write WAL starting at EndOfLog location, and init xlog
* buffer cache using the block containing the last record from the
* previous incarnation.
*/
@ -7383,13 +7443,23 @@ StartupXLOG(void)
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
LocalSetXLogInsertAllowed();
/* If necessary, write overwrite-contrecord before doing anything else */
if (!XLogRecPtrIsInvalid(abortedRecPtr))
{
Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
CreateOverwriteContrecordRecord(abortedRecPtr);
abortedRecPtr = InvalidXLogRecPtr;
missingContrecPtr = InvalidXLogRecPtr;
}
/*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
* record before resource manager writes cleanup WAL records or checkpoint
* record is written.
*/
Insert->fullPageWrites = lastFullPageWrites;
LocalSetXLogInsertAllowed();
UpdateFullPageWrites();
LocalXLogInsertAllowed = -1;
@ -8826,6 +8896,53 @@ CreateEndOfRecoveryRecord(void)
LocalXLogInsertAllowed = -1; /* return to "check" state */
}
/*
* Write an OVERWRITE_CONTRECORD message.
*
* When on WAL replay we expect a continuation record at the start of a page
* that is not there, recovery ends and WAL writing resumes at that point.
* But it's wrong to resume writing new WAL back at the start of the record
* that was broken, because downstream consumers of that WAL (physical
* replicas) are not prepared to "rewind". So the first action after
* finishing replay of all valid WAL must be to write a record of this type
* at the point where the contrecord was missing; to support xlogreader
* detecting the special case, XLP_FIRST_IS_OVERWRITE_CONTRECORD is also added
* to the page header where the record occurs. xlogreader has an ad-hoc
* mechanism to report metadata about the broken record, which is what we
* use here.
*
* At replay time, XLP_FIRST_IS_OVERWRITE_CONTRECORD instructs xlogreader to
* skip the record it was reading, and pass back the LSN of the skipped
* record, so that its caller can verify (on "replay" of that record) that the
* XLOG_OVERWRITE_CONTRECORD matches what was effectively overwritten.
*/
static XLogRecPtr
CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
{
xl_overwrite_contrecord xlrec;
XLogRecPtr recptr;
/* sanity check */
if (!RecoveryInProgress())
elog(ERROR, "can only be used at end of recovery");
xlrec.overwritten_lsn = aborted_lsn;
xlrec.overwrite_time = GetCurrentTimestamp();
START_CRIT_SECTION();
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
XLogFlush(recptr);
END_CRIT_SECTION();
return recptr;
}
/*
* Flush all data in shared memory to disk, and fsync
*
@ -9622,6 +9739,13 @@ xlog_redo(XLogReaderState *record)
RecoveryRestartPoint(&checkPoint);
}
else if (info == XLOG_OVERWRITE_CONTRECORD)
{
xl_overwrite_contrecord xlrec;
memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
VerifyOverwriteContrecord(&xlrec, record);
}
else if (info == XLOG_END_OF_RECOVERY)
{
xl_end_of_recovery xlrec;
@ -9784,6 +9908,29 @@ xlog_redo(XLogReaderState *record)
}
}
/*
* Verify the payload of a XLOG_OVERWRITE_CONTRECORD record.
*/
static void
VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state)
{
if (xlrec->overwritten_lsn != state->overwrittenRecPtr)
elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
(uint32) (xlrec->overwritten_lsn >> 32),
(uint32) xlrec->overwritten_lsn,
(uint32) (state->overwrittenRecPtr >> 32),
(uint32) state->overwrittenRecPtr);
ereport(LOG,
(errmsg("sucessfully skipped missing contrecord at %X/%X, overwritten at %s",
(uint32) (xlrec->overwritten_lsn >> 32),
(uint32) xlrec->overwritten_lsn,
timestamptz_to_str(xlrec->overwrite_time))));
/* Verifying the record should only happen once */
state->overwrittenRecPtr = InvalidXLogRecPtr;
}
#ifdef WAL_DEBUG
static void

View File

@ -220,6 +220,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
bool assembled;
bool gotheader;
int readOff;
@ -235,6 +236,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
state->errormsg_buf[0] = '\0';
ResetDecoder(state);
state->abortedRecPtr = InvalidXLogRecPtr;
state->missingContrecPtr = InvalidXLogRecPtr;
if (RecPtr == InvalidXLogRecPtr)
{
@ -263,7 +266,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
randAccess = true;
}
restart:
state->currRecPtr = RecPtr;
assembled = false;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
targetRecOff = RecPtr % XLOG_BLCKSZ;
@ -373,6 +378,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
char *buffer;
uint32 gotlen;
assembled = true;
/* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf,
state->readBuf + RecPtr % XLOG_BLCKSZ, len);
@ -394,8 +400,25 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Assert(SizeOfXLogShortPHD <= readOff);
/* Check that the continuation on next page looks valid */
pageHeader = (XLogPageHeader) state->readBuf;
/*
* If we were expecting a continuation record and got an
* "overwrite contrecord" flag, that means the continuation record
* was overwritten with a different record. Restart the read by
* assuming the address to read is the location where we found
* this flag; but keep track of the LSN of the record we were
* reading, for later verification.
*/
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
{
state->overwrittenRecPtr = state->currRecPtr;
ResetDecoder(state);
RecPtr = targetPagePtr;
goto restart;
}
/* Check that the continuation on next page looks valid */
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
report_invalid_record(state,
@ -496,6 +519,20 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
return NULL;
err:
if (assembled)
{
/*
* We get here when a record that spans multiple pages needs to be
* assembled, but something went wrong -- perhaps a contrecord piece
* was lost. If caller is WAL replay, it will know where the aborted
* record was and where to direct followup WAL to be written, marking
* the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
* in turn signal downstream WAL consumers that the broken WAL record
* is to be ignored.
*/
state->abortedRecPtr = RecPtr;
state->missingContrecPtr = targetPagePtr;
}
/*
* Invalidate the read state. We might read from a different source after

View File

@ -79,8 +79,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define XLP_LONG_HEADER 0x0002
/* This flag indicates backup blocks starting in this page are optional */
#define XLP_BKP_REMOVABLE 0x0004
/* Replaces a missing contrecord; see CreateOverwriteContrecordRecord */
#define XLP_FIRST_IS_OVERWRITE_CONTRECORD 0x0008
/* All defined flag bits in xlp_info (used for validity checking of header) */
#define XLP_ALL_FLAGS 0x0007
#define XLP_ALL_FLAGS 0x000F
#define XLogPageHeaderSize(hdr) \
(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@ -226,6 +228,13 @@ typedef struct xl_restore_point
char rp_name[MAXFNAMELEN];
} xl_restore_point;
/* Overwrite of prior contrecord */
typedef struct xl_overwrite_contrecord
{
XLogRecPtr overwritten_lsn;
TimestampTz overwrite_time;
} xl_overwrite_contrecord;
/* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
typedef struct xl_end_of_recovery
{

View File

@ -167,6 +167,16 @@ struct XLogReaderState
/* Buffer to hold error message */
char *errormsg_buf;
/*
* Set at the end of recovery: the start point of a partial record at the
* end of WAL (InvalidXLogRecPtr if there wasn't one), and the start
* location of its first contrecord that went missing.
*/
XLogRecPtr abortedRecPtr;
XLogRecPtr missingContrecPtr;
/* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */
XLogRecPtr overwrittenRecPtr;
};
/* Get a new XLogReader */

View File

@ -74,6 +74,7 @@ typedef struct CheckPoint
#define XLOG_FPI_FOR_HINT 0xA0
#define XLOG_FPI 0xB0
#define XLOG_FPI_MULTI 0xC0
#define XLOG_OVERWRITE_CONTRECORD 0xD0
/*

View File

@ -0,0 +1,116 @@
# Copyright (c) 2021, PostgreSQL Global Development Group
# Tests for already-propagated WAL segments ending in incomplete WAL records.
use strict;
use warnings;
use FindBin;
use PostgresNode;
use TestLib;
use Test::More;
plan tests => 3;
# Test: Create a physical replica that's missing the last WAL file,
# then restart the primary to create a divergent WAL file and observe
# that the replica replays the "overwrite contrecord" from that new
# file.
my $node = PostgresNode->get_new_node('primary');
$node->init(allows_streaming => 1);
$node->append_conf('postgresql.conf', 'wal_keep_segments=16');
$node->start;
$node->safe_psql('postgres', 'create table filler (a int)');
# First, measure how many bytes does the insertion of 1000 rows produce
my $start_lsn = $node->safe_psql('postgres',
q{select pg_current_xlog_insert_location() - '0/0'});
$node->safe_psql('postgres',
'insert into filler select * from generate_series(1, 1000)');
my $end_lsn = $node->safe_psql('postgres',
q{select pg_current_xlog_insert_location() - '0/0'});
my $rows_walsize = $end_lsn - $start_lsn;
note "rows walsize $rows_walsize";
note "before fill ",
$node->safe_psql('postgres', 'select pg_current_xlog_insert_location()');
# Now consume all remaining room in the current WAL segment, leaving
# space enough only for the start of a largish record.
$node->safe_psql(
'postgres', qq{
WITH segsize AS (
SELECT setting::int
FROM pg_settings WHERE name = 'wal_segment_size'
), walblksz AS (
SELECT setting::int
FROM pg_settings WHERE name = 'wal_block_size'
), setting AS (
SELECT segsize.setting * walblksz.setting AS wal_segsize
FROM segsize, walblksz
)
INSERT INTO filler
SELECT g FROM setting,
generate_series(1, 1000 * (wal_segsize - ((pg_current_xlog_insert_location() - '0/0') % wal_segsize)) / $rows_walsize) g
});
note "start ",
$node->safe_psql('postgres', 'select pg_current_xlog_insert_location()');
my $initfile = $node->safe_psql('postgres',
'SELECT pg_xlogfile_name(pg_current_xlog_insert_location())');
$node->safe_psql('postgres',
qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
);
#$node->safe_psql('postgres', qq{create table foo ()});
sleep 1;
my $endfile = $node->safe_psql('postgres',
'SELECT pg_xlogfile_name(pg_current_xlog_insert_location())');
note "end: ",
$node->safe_psql('postgres', 'select pg_current_xlog_insert_location()');
ok($initfile != $endfile, "$initfile differs from $endfile");
# Now stop abruptly, to avoid a stop checkpoint. We can remove the tail file
# afterwards, and on startup the large message should be overwritten with new
# contents
$node->stop('immediate');
unlink $node->basedir . "/pgdata/pg_xlog/$endfile"
or die "could not unlink "
. $node->basedir
. "/pgdata/pg_xlog/$endfile: $!";
# OK, create a standby at this spot.
$node->backup_fs_cold('backup');
my $node_standby = PostgresNode->get_new_node('standby');
$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
$node_standby->start;
$node->start;
$node->safe_psql('postgres',
qq{create table foo (a text); insert into foo values ('hello')});
$node->safe_psql('postgres',
qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
my $until_lsn =
$node->safe_psql('postgres', "SELECT pg_current_xlog_insert_location()");
my $caughtup_query =
"SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
$node_standby->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby to catch up";
ok($node_standby->safe_psql('postgres', 'select * from foo') eq 'hello',
'standby replays past overwritten contrecord');
# Verify message appears in standby's log
my $log = slurp_file($node_standby->logfile);
like(
$log,
qr[sucessfully skipped missing contrecord at],
"found log line in standby");
$node->stop;
$node_standby->stop;

View File

@ -2902,6 +2902,7 @@ xl_logical_message
xl_multi_insert_tuple
xl_multixact_create
xl_multixact_truncate
xl_overwrite_contrecord
xl_parameter_change
xl_relmap_update
xl_replorigin_drop