Widen xl_len field of XLogRecord header to 32 bits, so that we'll have

a more tolerable limit on the number of subtransactions or deleted files
in COMMIT and ABORT records.  Buy back the extra space by eliminating the
xl_xact_prev field, which isn't being used for anything and is rather
unlikely ever to be used for anything.
This does not force initdb, but you do need to do pg_resetxlog if you
want to upgrade an existing 8.0 installation without initdb.
This commit is contained in:
Tom Lane 2004-08-29 16:34:48 +00:00
parent b6b71b85bc
commit 0ffe11abd3
4 changed files with 68 additions and 71 deletions

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.166 2004/08/29 05:06:40 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.167 2004/08/29 16:34:47 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -409,6 +409,10 @@ static uint32 readOff = 0;
/* Buffer for currently read page (BLCKSZ bytes) */ /* Buffer for currently read page (BLCKSZ bytes) */
static char *readBuf = NULL; static char *readBuf = NULL;
/* Buffer for current ReadRecord result (expandable) */
static char *readRecordBuf = NULL;
static uint32 readRecordBufSize = 0;
/* State information for XLOG reading */ /* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; static XLogRecPtr ReadRecPtr;
static XLogRecPtr EndRecPtr; static XLogRecPtr EndRecPtr;
@ -440,11 +444,9 @@ static bool RestoreArchivedFile(char *path, const char *xlogfname,
const char *recovername, off_t expectedSize); const char *recovername, off_t expectedSize);
static void PreallocXlogFiles(XLogRecPtr endptr); static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr); static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
int whichChkpt,
char *buffer);
static List *readTimeLineHistory(TimeLineID targetTLI); static List *readTimeLineHistory(TimeLineID targetTLI);
static bool existsTimeLineHistory(TimeLineID probeTLI); static bool existsTimeLineHistory(TimeLineID probeTLI);
static TimeLineID findNewestTimeLine(TimeLineID startTLI); static TimeLineID findNewestTimeLine(TimeLineID startTLI);
@ -627,7 +629,7 @@ begin:;
* may not be true forever. If you need to remove the len == 0 check, * may not be true forever. If you need to remove the len == 0 check,
* also remove the check for xl_len == 0 in ReadRecord, below. * also remove the check for xl_len == 0 in ReadRecord, below.
*/ */
if (len == 0 || len > MAXLOGRECSZ) if (len == 0)
elog(PANIC, "invalid xlog record length %u", len); elog(PANIC, "invalid xlog record length %u", len);
START_CRIT_SECTION(); START_CRIT_SECTION();
@ -745,14 +747,6 @@ begin:;
/* Insert record header */ /* Insert record header */
record->xl_prev = Insert->PrevRecord; record->xl_prev = Insert->PrevRecord;
if (no_tran)
{
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
}
else
record->xl_xact_prev = MyLastRecPtr;
record->xl_xid = GetCurrentTransactionId(); record->xl_xid = GetCurrentTransactionId();
record->xl_len = len; /* doesn't include backup blocks */ record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info; record->xl_info = info;
@ -2316,14 +2310,14 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
* If no valid record is available, returns NULL, or fails if emode is PANIC. * If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC or LOG.) * (emode must be either PANIC or LOG.)
* *
* buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long. It is needed * The record is copied into readRecordBuf, so that on successful return,
* to reassemble a record that crosses block boundaries. Note that on * the returned record pointer always points there.
* successful return, the returned record pointer always points at buffer.
*/ */
static XLogRecord * static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer) ReadRecord(XLogRecPtr *RecPtr, int emode)
{ {
XLogRecord *record; XLogRecord *record;
char *buffer;
XLogRecPtr tmpRecPtr = EndRecPtr; XLogRecPtr tmpRecPtr = EndRecPtr;
bool randAccess = false; bool randAccess = false;
uint32 len, uint32 len,
@ -2467,6 +2461,13 @@ got_record:;
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (record->xl_rmid > RM_MAX_ID)
{
ereport(emode,
(errmsg("invalid resource manager ID %u at %X/%X",
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
/* /*
* Compute total length of record including any appended backup * Compute total length of record including any appended backup
@ -2481,24 +2482,34 @@ got_record:;
} }
/* /*
* Make sure it will fit in buffer (currently, it is mechanically * Allocate or enlarge readRecordBuf as needed. To avoid useless
* impossible for this test to fail, but it seems like a good idea * small increases, round its size to a multiple of BLCKSZ, and make
* anyway). * sure it's at least 4*BLCKSZ to start with. (That is enough for
* all "normal" records, but very large commit or abort records might
* need more space.)
*/ */
if (total_len > _INTL_MAXLOGRECSZ) if (total_len > readRecordBufSize)
{ {
ereport(emode, uint32 newSize = total_len;
(errmsg("record length %u at %X/%X too long",
total_len, RecPtr->xlogid, RecPtr->xrecoff))); newSize += BLCKSZ - (newSize % BLCKSZ);
goto next_record_is_invalid; newSize = Max(newSize, 4 * BLCKSZ);
} if (readRecordBuf)
if (record->xl_rmid > RM_MAX_ID) free(readRecordBuf);
{ readRecordBuf = (char *) malloc(newSize);
ereport(emode, if (!readRecordBuf)
(errmsg("invalid resource manager ID %u at %X/%X", {
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff))); readRecordBufSize = 0;
goto next_record_is_invalid; /* We treat this as a "bogus data" condition */
ereport(emode,
(errmsg("record length %u at %X/%X too long",
total_len, RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
readRecordBufSize = newSize;
} }
buffer = readRecordBuf;
nextRecord = NULL; nextRecord = NULL;
len = BLCKSZ - RecPtr->xrecoff % BLCKSZ; len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
if (total_len > len) if (total_len > len)
@ -3481,8 +3492,6 @@ BootStrapXLOG(void)
record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD); record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
record->xl_prev.xlogid = 0; record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0; record->xl_prev.xrecoff = 0;
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
record->xl_xid = InvalidTransactionId; record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(checkPoint); record->xl_len = sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
@ -3981,12 +3990,8 @@ StartupXLOG(void)
uint32 endLogId; uint32 endLogId;
uint32 endLogSeg; uint32 endLogSeg;
XLogRecord *record; XLogRecord *record;
char *buffer;
uint32 freespace; uint32 freespace;
/* Use malloc() to ensure record buffer is MAXALIGNED */
buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
CritSectionCount++; CritSectionCount++;
/* /*
@ -4063,7 +4068,7 @@ StartupXLOG(void)
* from the checkpoint it identifies, rather than using * from the checkpoint it identifies, rather than using
* pg_control. * pg_control.
*/ */
record = ReadCheckpointRecord(checkPointLoc, 0, buffer); record = ReadCheckpointRecord(checkPointLoc, 0);
if (record != NULL) if (record != NULL)
{ {
ereport(LOG, ereport(LOG,
@ -4085,7 +4090,7 @@ StartupXLOG(void)
* according to pg_control is broken, try the next-to-last one. * according to pg_control is broken, try the next-to-last one.
*/ */
checkPointLoc = ControlFile->checkPoint; checkPointLoc = ControlFile->checkPoint;
record = ReadCheckpointRecord(checkPointLoc, 1, buffer); record = ReadCheckpointRecord(checkPointLoc, 1);
if (record != NULL) if (record != NULL)
{ {
ereport(LOG, ereport(LOG,
@ -4095,7 +4100,7 @@ StartupXLOG(void)
else else
{ {
checkPointLoc = ControlFile->prevCheckPoint; checkPointLoc = ControlFile->prevCheckPoint;
record = ReadCheckpointRecord(checkPointLoc, 2, buffer); record = ReadCheckpointRecord(checkPointLoc, 2);
if (record != NULL) if (record != NULL)
{ {
ereport(LOG, ereport(LOG,
@ -4198,12 +4203,12 @@ StartupXLOG(void)
if (XLByteLT(checkPoint.redo, RecPtr)) if (XLByteLT(checkPoint.redo, RecPtr))
{ {
/* back up to find the record */ /* back up to find the record */
record = ReadRecord(&(checkPoint.redo), PANIC, buffer); record = ReadRecord(&(checkPoint.redo), PANIC);
} }
else else
{ {
/* just have to read next record after CheckPoint */ /* just have to read next record after CheckPoint */
record = ReadRecord(NULL, LOG, buffer); record = ReadRecord(NULL, LOG);
} }
if (record != NULL) if (record != NULL)
@ -4263,7 +4268,7 @@ StartupXLOG(void)
LastRec = ReadRecPtr; LastRec = ReadRecPtr;
record = ReadRecord(NULL, LOG, buffer); record = ReadRecord(NULL, LOG);
} while (record != NULL && recoveryContinue); } while (record != NULL && recoveryContinue);
/* /*
@ -4287,7 +4292,7 @@ StartupXLOG(void)
* Re-fetch the last valid or last applied record, so we can identify * Re-fetch the last valid or last applied record, so we can identify
* the exact endpoint of what we consider the valid portion of WAL. * the exact endpoint of what we consider the valid portion of WAL.
*/ */
record = ReadRecord(&LastRec, PANIC, buffer); record = ReadRecord(&LastRec, PANIC);
EndOfLog = EndRecPtr; EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg); XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
@ -4404,7 +4409,7 @@ StartupXLOG(void)
RecPtr.xlogid, RecPtr.xrecoff))); RecPtr.xlogid, RecPtr.xrecoff)));
do do
{ {
record = ReadRecord(&RecPtr, PANIC, buffer); record = ReadRecord(&RecPtr, PANIC);
if (TransactionIdIsValid(record->xl_xid) && if (TransactionIdIsValid(record->xl_xid) &&
!TransactionIdDidCommit(record->xl_xid)) !TransactionIdDidCommit(record->xl_xid))
RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record); RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
@ -4498,8 +4503,12 @@ StartupXLOG(void)
free(readBuf); free(readBuf);
readBuf = NULL; readBuf = NULL;
} }
if (readRecordBuf)
free(buffer); {
free(readRecordBuf);
readRecordBuf = NULL;
readRecordBufSize = 0;
}
} }
/* /*
@ -4509,9 +4518,7 @@ StartupXLOG(void)
* 1 for "primary", 2 for "secondary", 0 for "other" (backup_label) * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
*/ */
static XLogRecord * static XLogRecord *
ReadCheckpointRecord(XLogRecPtr RecPtr, ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
int whichChkpt,
char *buffer)
{ {
XLogRecord *record; XLogRecord *record;
@ -4535,7 +4542,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr,
return NULL; return NULL;
} }
record = ReadRecord(&RecPtr, LOG, buffer); record = ReadRecord(&RecPtr, LOG);
if (record == NULL) if (record == NULL)
{ {
@ -5080,9 +5087,8 @@ xlog_outrec(char *buf, XLogRecord *record)
int bkpb; int bkpb;
int i; int i;
sprintf(buf + strlen(buf), "prev %X/%X; xprev %X/%X; xid %u", sprintf(buf + strlen(buf), "prev %X/%X; xid %u",
record->xl_prev.xlogid, record->xl_prev.xrecoff, record->xl_prev.xlogid, record->xl_prev.xrecoff,
record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
record->xl_xid); record->xl_xid);
for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)

View File

@ -23,7 +23,7 @@
* Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/bin/pg_resetxlog/pg_resetxlog.c,v 1.23 2004/08/29 05:06:54 momjian Exp $ * $PostgreSQL: pgsql/src/bin/pg_resetxlog/pg_resetxlog.c,v 1.24 2004/08/29 16:34:48 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -645,8 +645,6 @@ WriteEmptyXLOG(void)
record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD); record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
record->xl_prev.xlogid = 0; record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0; record->xl_prev.xrecoff = 0;
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
record->xl_xid = InvalidTransactionId; record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(CheckPoint); record->xl_len = sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.57 2004/08/29 05:06:55 momjian Exp $ * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.58 2004/08/29 16:34:48 tgl Exp $
*/ */
#ifndef XLOG_H #ifndef XLOG_H
#define XLOG_H #define XLOG_H
@ -35,18 +35,18 @@ typedef struct XLogRecord
{ {
crc64 xl_crc; /* CRC for this record */ crc64 xl_crc; /* CRC for this record */
XLogRecPtr xl_prev; /* ptr to previous record in log */ XLogRecPtr xl_prev; /* ptr to previous record in log */
XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */
TransactionId xl_xid; /* xact id */ TransactionId xl_xid; /* xact id */
uint16 xl_len; /* total len of rmgr data */ uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */ uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */ RmgrId xl_rmid; /* resource manager for this record */
/* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
} XLogRecord; } XLogRecord;
#define SizeOfXLogRecord MAXALIGN(sizeof(XLogRecord)) #define SizeOfXLogRecord MAXALIGN(sizeof(XLogRecord))
#define MAXLOGRECSZ 65535 /* the most that'll fit in xl_len */
#define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord) #define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord)

View File

@ -11,7 +11,7 @@
* Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.4 2004/08/29 05:06:55 momjian Exp $ * $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.5 2004/08/29 16:34:48 tgl Exp $
*/ */
#ifndef XLOG_INTERNAL_H #ifndef XLOG_INTERNAL_H
#define XLOG_INTERNAL_H #define XLOG_INTERNAL_H
@ -58,7 +58,7 @@ typedef struct XLogContRecord
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD05B /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD05C /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
@ -203,13 +203,6 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
extern char XLogDir[MAXPGPATH]; extern char XLogDir[MAXPGPATH];
/*
* _INTL_MAXLOGRECSZ: max space needed for a record including header and
* any backup-block data.
*/
#define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \
XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
/* /*
* Method table for resource managers. * Method table for resource managers.