pg_walinspect: fix case where flush LSN is in the middle of a record.

Instability in the test for pg_walinspect revealed that
pg_get_wal_records_info_till_end_of_wal(x) would try to decode all the
records with a start LSN earlier than the flush LSN, even though that
might include a partial record at the end of the range. In that case,
read_local_xlog_page_no_wait() would return NULL when it tried to read
past the flush LSN, which would be interpreted as an error by the
caller. That caused a test failure only on a BF animal that had been
restarted recently, but could be expected to happen in the wild quite
easily depending on the alignment of various parameters.

Fix by using private data in read_local_xlog_page_no_wait() to signal
end-of-wal to the caller, so that it can be properly distinguished
from a real error.

Discussion: https://postgr.es/m/Ymd/e5eeZMNAkrXo%40paquier.xyz
Discussion: https://postgr.es/m/111657.1650910309@sss.pgh.pa.us

Authors: Thomas Munro, Bharath Rupireddy.
This commit is contained in:
Jeff Davis 2022-04-30 08:28:33 -07:00
parent ccd10a9bfa
commit ed57cac84d
3 changed files with 50 additions and 26 deletions

View File

@ -89,6 +89,7 @@ static XLogReaderState *
InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
{
XLogReaderState *xlogreader;
ReadLocalXLogPageNoWaitPrivate *private_data;
/*
* Reading WAL below the first page of the first segments isn't allowed.
@ -100,11 +101,14 @@ InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
(errmsg("could not read WAL at LSN %X/%X",
LSN_FORMAT_ARGS(lsn))));
private_data = (ReadLocalXLogPageNoWaitPrivate *)
palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
.segment_open = &wal_segment_open,
.segment_close = &wal_segment_close),
NULL);
private_data);
if (xlogreader == NULL)
ereport(ERROR,
@ -132,7 +136,8 @@ InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
*
* We guard against ordinary errors trying to read WAL that hasn't been
* written yet by limiting end_lsn to the flushed WAL, but that can also
* encounter errors if the flush pointer falls in the middle of a record.
* encounter errors if the flush pointer falls in the middle of a record. In
* that case we'll return NULL.
*/
static XLogRecord *
ReadNextXLogRecord(XLogReaderState *xlogreader, XLogRecPtr first_record)
@ -144,6 +149,15 @@ ReadNextXLogRecord(XLogReaderState *xlogreader, XLogRecPtr first_record)
if (record == NULL)
{
ReadLocalXLogPageNoWaitPrivate *private_data;
/* return NULL, if end of WAL is reached */
private_data = (ReadLocalXLogPageNoWaitPrivate *)
xlogreader->private_data;
if (private_data->end_of_wal)
return NULL;
if (errormsg)
ereport(ERROR,
(errcode_for_file_access(),
@ -246,7 +260,11 @@ pg_get_wal_record_info(PG_FUNCTION_ARGS)
xlogreader = InitXLogReaderState(lsn, &first_record);
(void) ReadNextXLogRecord(xlogreader, first_record);
if (!ReadNextXLogRecord(xlogreader, first_record))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not read WAL at %X/%X",
LSN_FORMAT_ARGS(first_record))));
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
@ -254,6 +272,7 @@ pg_get_wal_record_info(PG_FUNCTION_ARGS)
GetWALRecordInfo(xlogreader, first_record, values, nulls,
PG_GET_WAL_RECORD_INFO_COLS);
pfree(xlogreader->private_data);
XLogReaderFree(xlogreader);
tuple = heap_form_tuple(tupdesc, values, nulls);
@ -327,26 +346,19 @@ GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
for (;;)
while (ReadNextXLogRecord(xlogreader, first_record) &&
xlogreader->EndRecPtr <= end_lsn)
{
(void) ReadNextXLogRecord(xlogreader, first_record);
GetWALRecordInfo(xlogreader, xlogreader->currRecPtr, values, nulls,
PG_GET_WAL_RECORDS_INFO_COLS);
if (xlogreader->EndRecPtr <= end_lsn)
{
GetWALRecordInfo(xlogreader, xlogreader->currRecPtr, values, nulls,
PG_GET_WAL_RECORDS_INFO_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);
}
/* if we read up to end_lsn, we're done */
if (xlogreader->EndRecPtr >= end_lsn)
break;
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
values, nulls);
CHECK_FOR_INTERRUPTS();
}
pfree(xlogreader->private_data);
XLogReaderFree(xlogreader);
#undef PG_GET_WAL_RECORDS_INFO_COLS
@ -555,20 +567,15 @@ GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
MemSet(&stats, 0, sizeof(stats));
for (;;)
while (ReadNextXLogRecord(xlogreader, first_record) &&
xlogreader->EndRecPtr <= end_lsn)
{
(void) ReadNextXLogRecord(xlogreader, first_record);
if (xlogreader->EndRecPtr <= end_lsn)
XLogRecStoreStats(&stats, xlogreader);
/* if we read up to end_lsn, we're done */
if (xlogreader->EndRecPtr >= end_lsn)
break;
XLogRecStoreStats(&stats, xlogreader);
CHECK_FOR_INTERRUPTS();
}
pfree(xlogreader->private_data);
XLogReaderFree(xlogreader);
MemSet(values, 0, sizeof(values));

View File

@ -957,7 +957,18 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
/* If asked, let's not wait for future WAL. */
if (!wait_for_wal)
{
ReadLocalXLogPageNoWaitPrivate *private_data;
/*
* Inform the caller of read_local_xlog_page_no_wait that the
* end of WAL has been reached.
*/
private_data = (ReadLocalXLogPageNoWaitPrivate *)
state->private_data;
private_data->end_of_wal = true;
break;
}
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);

View File

@ -75,6 +75,12 @@ typedef enum
* need to be replayed) */
} XLogRedoAction;
/* Private data of the read_local_xlog_page_no_wait callback. */
typedef struct ReadLocalXLogPageNoWaitPrivate
{
bool end_of_wal; /* true, when end of WAL is reached */
} ReadLocalXLogPageNoWaitPrivate;
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,
uint8 buffer_id, Buffer *buf);
extern Buffer XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id);