From 39e98d9563a48ddf5b25e89bad91b391f5af9690 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 22 May 2003 14:39:28 +0000 Subject: [PATCH] Repair sometimes-incorrect computation of StartUpID after a crash, per example from Rao Kumar. This is a very corner corner-case, requiring a minimum of three closely-spaced database crashes and an unlucky positioning of the second recovery's checkpoint record before you'd notice any problem. But the consequences are dire enough that it's a must-fix. --- src/backend/access/transam/xlog.c | 116 ++++++++++++++++-------------- 1 file changed, 64 insertions(+), 52 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f68fc70536..7fa274922b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.115 2003/05/10 18:01:31 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.116 2003/05/22 14:39:28 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2568,7 +2568,18 @@ StartupXLOG(void) ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; - ThisStartUpID = checkPoint.ThisStartUpID; + /* + * If it was a shutdown checkpoint, then any following WAL entries were + * created under the next StartUpID; if it was a regular checkpoint then + * any following WAL entries were created under the same StartUpID. + * We must replay WAL entries using the same StartUpID they were created + * under, so temporarily adopt that SUI (see also xlog_redo()). + */ + if (wasShutdown) + ThisStartUpID = checkPoint.ThisStartUpID + 1; + else + ThisStartUpID = checkPoint.ThisStartUpID; + RedoRecPtr = XLogCtl->Insert.RedoRecPtr = XLogCtl->SavedRedoRecPtr = checkPoint.redo; @@ -2672,55 +2683,21 @@ StartupXLOG(void) ControlFile->logSeg = openLogSeg + 1; Insert = &XLogCtl->Insert; Insert->PrevRecord = LastRec; + XLogCtl->xlblocks[0].xlogid = openLogId; + XLogCtl->xlblocks[0].xrecoff = + ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; /* - * If the next record will go to the new page then initialize for that - * one. + * Tricky point here: readBuf contains the *last* block that the + * LastRec record spans, not the one it starts in. The last block + * is indeed the one we want to use. */ - if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord) - EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ); - if (EndOfLog.xrecoff % BLCKSZ == 0) - { - XLogRecPtr NewPageEndPtr; - - NewPageEndPtr = EndOfLog; - if (NewPageEndPtr.xrecoff >= XLogFileSize) - { - /* crossing a logid boundary */ - NewPageEndPtr.xlogid += 1; - NewPageEndPtr.xrecoff = BLCKSZ; - } - else - NewPageEndPtr.xrecoff += BLCKSZ; - XLogCtl->xlblocks[0] = NewPageEndPtr; - Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC; - if (InRecovery) - Insert->currpage->xlp_sui = ThisStartUpID; - else - Insert->currpage->xlp_sui = ThisStartUpID + 1; - Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; - Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; - /* rest of buffer was zeroed in XLOGShmemInit */ - Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD; - } - else - { - XLogCtl->xlblocks[0].xlogid = openLogId; - XLogCtl->xlblocks[0].xrecoff = - ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ; - - /* - * Tricky point here: readBuf contains the *last* block that the - * LastRec record spans, not the one it starts in. The last block - * is indeed the one we want to use. - */ - Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize); - memcpy((char *) Insert->currpage, readBuf, BLCKSZ); - Insert->currpos = (char *) Insert->currpage + - (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); - /* Make sure rest of page is zero */ - memset(Insert->currpos, 0, INSERT_FREESPACE(Insert)); - } + Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize); + memcpy((char *) Insert->currpage, readBuf, BLCKSZ); + Insert->currpos = (char *) Insert->currpage + + (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff); + /* Make sure rest of page is zero */ + MemSet(Insert->currpos, 0, INSERT_FREESPACE(Insert)); LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; @@ -2774,9 +2751,22 @@ StartupXLOG(void) MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; + /* + * At this point, ThisStartUpID is the largest SUI that we could + * find evidence for in the WAL entries. But check it against + * pg_control's latest checkpoint, to make sure that we can't + * accidentally re-use an already-used SUI. + */ + if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID) + ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID; + /* * Perform a new checkpoint to update our recovery activity to disk. * + * Note that we write a shutdown checkpoint. This is correct since + * the records following it will use SUI one more than what is shown + * in the checkpoint's ThisStartUpID. + * * In case we had to use the secondary checkpoint, make sure that * it will still be shown as the secondary checkpoint after this * CreateCheckPoint operation; we don't want the broken primary @@ -2790,21 +2780,39 @@ StartupXLOG(void) */ XLogCloseRelationCache(); } + else + { + /* + * If we are not doing recovery, then we saw a checkpoint with nothing + * after it, and we can safely use StartUpID equal to one more than + * the checkpoint's SUI. But just for paranoia's sake, check against + * pg_control too. + */ + ThisStartUpID = checkPoint.ThisStartUpID; + if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID) + ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID; + } /* * Preallocate additional log files, if wanted. */ PreallocXlogFiles(EndOfLog); + /* + * Advance StartUpID to one more than the highest value used previously. + */ + ThisStartUpID++; + XLogCtl->ThisStartUpID = ThisStartUpID; + + /* + * Okay, we're officially UP. + */ InRecovery = false; ControlFile->state = DB_IN_PRODUCTION; ControlFile->time = time(NULL); UpdateControlFile(); - ThisStartUpID++; - XLogCtl->ThisStartUpID = ThisStartUpID; - /* Start up the commit log, too */ StartupCLOG(); @@ -3000,7 +3008,7 @@ CreateCheckPoint(bool shutdown, bool force) UpdateControlFile(); } - memset(&checkPoint, 0, sizeof(checkPoint)); + MemSet(&checkPoint, 0, sizeof(checkPoint)); checkPoint.ThisStartUpID = ThisStartUpID; checkPoint.time = time(NULL); @@ -3259,6 +3267,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; + /* Any later WAL records should be run with shutdown SUI plus 1 */ + ThisStartUpID = checkPoint.ThisStartUpID + 1; } else if (info == XLOG_CHECKPOINT_ONLINE) { @@ -3274,6 +3284,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; } + /* Any later WAL records should be run with the then-active SUI */ + ThisStartUpID = checkPoint.ThisStartUpID; } }