Repair sometimes-incorrect computation of StartUpID after a crash, per

example from Rao Kumar.  This is a very corner corner-case, requiring
a minimum of three closely-spaced database crashes and an unlucky
positioning of the second recovery's checkpoint record before you'd notice
any problem.  But the consequences are dire enough that it's a must-fix.
This commit is contained in:
Tom Lane 2003-05-22 14:39:28 +00:00
parent baba07173c
commit 39e98d9563

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.115 2003/05/10 18:01:31 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.116 2003/05/22 14:39:28 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -2568,7 +2568,18 @@ StartupXLOG(void)
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
ThisStartUpID = checkPoint.ThisStartUpID; /*
* If it was a shutdown checkpoint, then any following WAL entries were
* created under the next StartUpID; if it was a regular checkpoint then
* any following WAL entries were created under the same StartUpID.
* We must replay WAL entries using the same StartUpID they were created
* under, so temporarily adopt that SUI (see also xlog_redo()).
*/
if (wasShutdown)
ThisStartUpID = checkPoint.ThisStartUpID + 1;
else
ThisStartUpID = checkPoint.ThisStartUpID;
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
XLogCtl->SavedRedoRecPtr = checkPoint.redo; XLogCtl->SavedRedoRecPtr = checkPoint.redo;
@ -2672,55 +2683,21 @@ StartupXLOG(void)
ControlFile->logSeg = openLogSeg + 1; ControlFile->logSeg = openLogSeg + 1;
Insert = &XLogCtl->Insert; Insert = &XLogCtl->Insert;
Insert->PrevRecord = LastRec; Insert->PrevRecord = LastRec;
XLogCtl->xlblocks[0].xlogid = openLogId;
XLogCtl->xlblocks[0].xrecoff =
((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
/* /*
* If the next record will go to the new page then initialize for that * Tricky point here: readBuf contains the *last* block that the
* one. * LastRec record spans, not the one it starts in. The last block
* is indeed the one we want to use.
*/ */
if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord) Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ); memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
if (EndOfLog.xrecoff % BLCKSZ == 0) Insert->currpos = (char *) Insert->currpage +
{ (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
XLogRecPtr NewPageEndPtr; /* Make sure rest of page is zero */
MemSet(Insert->currpos, 0, INSERT_FREESPACE(Insert));
NewPageEndPtr = EndOfLog;
if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
/* crossing a logid boundary */
NewPageEndPtr.xlogid += 1;
NewPageEndPtr.xrecoff = BLCKSZ;
}
else
NewPageEndPtr.xrecoff += BLCKSZ;
XLogCtl->xlblocks[0] = NewPageEndPtr;
Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
if (InRecovery)
Insert->currpage->xlp_sui = ThisStartUpID;
else
Insert->currpage->xlp_sui = ThisStartUpID + 1;
Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
/* rest of buffer was zeroed in XLOGShmemInit */
Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
}
else
{
XLogCtl->xlblocks[0].xlogid = openLogId;
XLogCtl->xlblocks[0].xrecoff =
((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
/*
* Tricky point here: readBuf contains the *last* block that the
* LastRec record spans, not the one it starts in. The last block
* is indeed the one we want to use.
*/
Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
Insert->currpos = (char *) Insert->currpage +
(EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
/* Make sure rest of page is zero */
memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
}
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
@ -2774,9 +2751,22 @@ StartupXLOG(void)
MyXactMadeXLogEntry = false; MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false; MyXactMadeTempRelUpdate = false;
/*
* At this point, ThisStartUpID is the largest SUI that we could
* find evidence for in the WAL entries. But check it against
* pg_control's latest checkpoint, to make sure that we can't
* accidentally re-use an already-used SUI.
*/
if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
/* /*
* Perform a new checkpoint to update our recovery activity to disk. * Perform a new checkpoint to update our recovery activity to disk.
* *
* Note that we write a shutdown checkpoint. This is correct since
* the records following it will use SUI one more than what is shown
* in the checkpoint's ThisStartUpID.
*
* In case we had to use the secondary checkpoint, make sure that * In case we had to use the secondary checkpoint, make sure that
* it will still be shown as the secondary checkpoint after this * it will still be shown as the secondary checkpoint after this
* CreateCheckPoint operation; we don't want the broken primary * CreateCheckPoint operation; we don't want the broken primary
@ -2790,21 +2780,39 @@ StartupXLOG(void)
*/ */
XLogCloseRelationCache(); XLogCloseRelationCache();
} }
else
{
/*
* If we are not doing recovery, then we saw a checkpoint with nothing
* after it, and we can safely use StartUpID equal to one more than
* the checkpoint's SUI. But just for paranoia's sake, check against
* pg_control too.
*/
ThisStartUpID = checkPoint.ThisStartUpID;
if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
}
/* /*
* Preallocate additional log files, if wanted. * Preallocate additional log files, if wanted.
*/ */
PreallocXlogFiles(EndOfLog); PreallocXlogFiles(EndOfLog);
/*
* Advance StartUpID to one more than the highest value used previously.
*/
ThisStartUpID++;
XLogCtl->ThisStartUpID = ThisStartUpID;
/*
* Okay, we're officially UP.
*/
InRecovery = false; InRecovery = false;
ControlFile->state = DB_IN_PRODUCTION; ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = time(NULL); ControlFile->time = time(NULL);
UpdateControlFile(); UpdateControlFile();
ThisStartUpID++;
XLogCtl->ThisStartUpID = ThisStartUpID;
/* Start up the commit log, too */ /* Start up the commit log, too */
StartupCLOG(); StartupCLOG();
@ -3000,7 +3008,7 @@ CreateCheckPoint(bool shutdown, bool force)
UpdateControlFile(); UpdateControlFile();
} }
memset(&checkPoint, 0, sizeof(checkPoint)); MemSet(&checkPoint, 0, sizeof(checkPoint));
checkPoint.ThisStartUpID = ThisStartUpID; checkPoint.ThisStartUpID = ThisStartUpID;
checkPoint.time = time(NULL); checkPoint.time = time(NULL);
@ -3259,6 +3267,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
/* Any later WAL records should be run with shutdown SUI plus 1 */
ThisStartUpID = checkPoint.ThisStartUpID + 1;
} }
else if (info == XLOG_CHECKPOINT_ONLINE) else if (info == XLOG_CHECKPOINT_ONLINE)
{ {
@ -3274,6 +3284,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0; ShmemVariableCache->oidCount = 0;
} }
/* Any later WAL records should be run with the then-active SUI */
ThisStartUpID = checkPoint.ThisStartUpID;
} }
} }