Add locking around WAL-replay modification of shared-memory variables.

Originally, most of this code assumed that no Postgres backends could be
running concurrently with it, and so no locking could be needed.  That
assumption fails in Hot Standby.  While it's still true that Hot Standby
backends should never change values like nextXid, they can examine them,
and consistency is important in some cases such as when computing a
snapshot.  Therefore, prudence requires that WAL replay code obtain the
relevant locks when modifying such variables, even though it can examine
them without taking a lock.  We were following that coding rule in some
places but not all.  This commit applies the coding rule uniformly to all
updates of ShmemVariableCache and MultiXactState fields; a search of the
replay routines did not find any other cases that seemed to be at risk.

In addition, this commit fixes a longstanding thinko in replay of NEXTOID
and checkpoint records: we tried to advance nextOid only if it was behind
the value in the WAL record, but the comparison would draw the wrong
conclusion if OID wraparound had occurred since the previous value.
Better to just unconditionally assign the new value, since OID assignment
shouldn't be happening during replay anyway.

The additional locking seems to be more in the nature of future-proofing
than fixing any live bug, so I am not going to back-patch it.  The NEXTOID
fix will be back-patched separately.
This commit is contained in:
Tom Lane 2012-02-06 12:34:10 -05:00
parent 96abd81744
commit c6d76d7c82
4 changed files with 75 additions and 20 deletions

View File

@ -1655,8 +1655,9 @@ CheckPointMultiXact(void)
* Set the next-to-be-assigned MultiXactId and offset
*
* This is used when we can determine the correct next ID/offset exactly
* from a checkpoint record. We need no locking since it is only called
* during bootstrap and XLog replay.
* from a checkpoint record. Although this is only called during bootstrap
* and XLog replay, we take the lock in case any hot-standby backends are
* examining the values.
*/
void
MultiXactSetNextMXact(MultiXactId nextMulti,
@ -1664,8 +1665,10 @@ MultiXactSetNextMXact(MultiXactId nextMulti,
{
debug_elog4(DEBUG2, "MultiXact: setting next multi to %u offset %u",
nextMulti, nextMultiOffset);
LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
MultiXactState->nextMXact = nextMulti;
MultiXactState->nextOffset = nextMultiOffset;
LWLockRelease(MultiXactGenLock);
}
/*
@ -1674,12 +1677,14 @@ MultiXactSetNextMXact(MultiXactId nextMulti,
*
* This is used when we can determine minimum safe values from an XLog
* record (either an on-line checkpoint or an mxact creation log entry).
* We need no locking since it is only called during XLog replay.
* Although this is only called during XLog replay, we take the lock in case
* any hot-standby backends are examining the values.
*/
void
MultiXactAdvanceNextMXact(MultiXactId minMulti,
MultiXactOffset minMultiOffset)
{
LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
if (MultiXactIdPrecedes(MultiXactState->nextMXact, minMulti))
{
debug_elog3(DEBUG2, "MultiXact: setting next multi to %u", minMulti);
@ -1691,6 +1696,7 @@ MultiXactAdvanceNextMXact(MultiXactId minMulti,
minMultiOffset);
MultiXactState->nextOffset = minMultiOffset;
}
LWLockRelease(MultiXactGenLock);
}
/*

View File

@ -1715,6 +1715,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
/*
* Examine subtransaction XIDs ... they should all follow main
* XID, and they may force us to advance nextXid.
*
* We don't expect anyone else to modify nextXid, hence we don't
* need to hold a lock while examining it. We still acquire the
* lock to modify it, though.
*/
subxids = (TransactionId *)
(buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
@ -1726,8 +1730,10 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
if (TransactionIdFollowsOrEquals(subxid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = subxid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
}

View File

@ -6625,12 +6625,20 @@ StartupXLOG(void)
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
/* nextXid must be beyond record's xid */
/*
* ShmemVariableCache->nextXid must be beyond record's xid.
*
* We don't expect anyone else to modify nextXid, hence we
* don't need to hold a lock while examining it. We still
* acquire the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(record->xl_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = record->xl_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
/*
@ -6656,6 +6664,7 @@ StartupXLOG(void)
TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid);
/* Now apply the WAL record itself */
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
/* Pop the error context stack */
@ -6971,8 +6980,10 @@ StartupXLOG(void)
XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
/* also initialize latestCompletedXid, to nextXid - 1 */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
LWLockRelease(ProcArrayLock);
/*
* Start up the commit log and subtrans, if not already done for
@ -8547,12 +8558,18 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
Oid nextOid;
/*
* We used to try to take the maximum of ShmemVariableCache->nextOid
* and the recorded nextOid, but that fails if the OID counter wraps
* around. Since no OID allocation should be happening during replay
* anyway, better to just believe the record exactly. We still take
* OidGenLock while setting the variable, just in case.
*/
memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
if (ShmemVariableCache->nextOid < nextOid)
{
ShmemVariableCache->nextOid = nextOid;
ShmemVariableCache->oidCount = 0;
}
LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextOid = nextOid;
ShmemVariableCache->oidCount = 0;
LWLockRelease(OidGenLock);
}
else if (info == XLOG_CHECKPOINT_SHUTDOWN)
{
@ -8560,9 +8577,13 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
/* In a SHUTDOWN checkpoint, believe the counters exactly */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = checkPoint.nextXid;
LWLockRelease(XidGenLock);
LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
LWLockRelease(OidGenLock);
MultiXactSetNextMXact(checkPoint.nextMulti,
checkPoint.nextMultiOffset);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
@ -8575,7 +8596,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
if (InArchiveRecovery &&
!XLogRecPtrIsInvalid(ControlFile->backupStartPoint) &&
XLogRecPtrIsInvalid(ControlFile->backupEndPoint))
ereport(ERROR,
ereport(PANIC,
(errmsg("online backup was canceled, recovery cannot continue")));
/*
@ -8641,15 +8662,17 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
CheckPoint checkPoint;
memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
/* In an ONLINE checkpoint, treat the counters like NEXTOID */
/* In an ONLINE checkpoint, treat the XID counter as a minimum */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
checkPoint.nextXid))
ShmemVariableCache->nextXid = checkPoint.nextXid;
if (ShmemVariableCache->nextOid < checkPoint.nextOid)
{
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
}
LWLockRelease(XidGenLock);
/* ... but still treat OID counter as exact */
LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
LWLockRelease(OidGenLock);
MultiXactAdvanceNextMXact(checkPoint.nextMulti,
checkPoint.nextMultiOffset);
if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,

View File

@ -654,17 +654,28 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
running->latestCompletedXid))
ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
/* nextXid must be beyond any observed xid */
Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
LWLockRelease(ProcArrayLock);
/*
* ShmemVariableCache->nextXid must be beyond any observed xid.
*
* We don't expect anyone else to modify nextXid, hence we don't need to
* hold a lock while examining it. We still acquire the lock to modify
* it, though.
*/
nextXid = latestObservedXid;
TransactionIdAdvance(nextXid);
if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = nextXid;
LWLockRelease(XidGenLock);
}
Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
LWLockRelease(ProcArrayLock);
KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
if (standbyState == STANDBY_SNAPSHOT_READY)
elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
@ -1690,6 +1701,13 @@ GetOldestActiveTransactionId(void)
LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
* It's okay to read nextXid without acquiring XidGenLock because (1) we
* assume TransactionIds can be read atomically and (2) we don't care if
* we get a slightly stale value. It can't be very stale anyway, because
* the LWLockAcquire above will have done any necessary memory
* interlocking.
*/
oldestRunningXid = ShmemVariableCache->nextXid;
/*
@ -2609,7 +2627,9 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
/* ShmemVariableCache->nextXid must be beyond any observed xid */
next_expected_xid = latestObservedXid;
TransactionIdAdvance(next_expected_xid);
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = next_expected_xid;
LWLockRelease(XidGenLock);
}
}