Truncate predicate lock manager's SLRU lazily at checkpoint. That's safer

than doing it aggressively whenever the tail-XID pointer is advanced, because
this way we don't need to do it while holding SerializableXactHashLock.

This also fixes bug #5915 spotted by YAMAMOTO Takashi, and removes an
obsolete comment spotted by Kevin Grittner.
This commit is contained in:
Heikki Linnakangas 2011-03-08 12:07:29 +02:00
parent 804d13adfd
commit 4cd3fb6e12
3 changed files with 69 additions and 52 deletions

View File

@ -48,6 +48,7 @@
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/reinit.h"
#include "storage/smgr.h"
@ -7875,6 +7876,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointCLOG();
CheckPointSUBTRANS();
CheckPointMultiXact();
CheckPointPredicate();
CheckPointRelationMap();
CheckPointBuffers(flags); /* performs all required fsyncs */
/* We deliberately delay 2PC checkpointing as long as possible */

View File

@ -323,10 +323,9 @@ static SlruCtlData OldSerXidSlruCtlData;
typedef struct OldSerXidControlData
{
int headPage;
int tailSegment;
TransactionId headXid;
TransactionId tailXid;
int headPage; /* newest initialized page */
TransactionId headXid; /* newest valid Xid in the SLRU */
TransactionId tailXid; /* oldest xmin we might be interested in */
bool warningIssued;
} OldSerXidControlData;
@ -711,7 +710,6 @@ OldSerXidInit(void)
* Set control information to reflect empty SLRU.
*/
oldSerXidControl->headPage = -1;
oldSerXidControl->tailSegment = -1;
oldSerXidControl->headXid = InvalidTransactionId;
oldSerXidControl->tailXid = InvalidTransactionId;
oldSerXidControl->warningIssued = false;
@ -722,10 +720,6 @@ OldSerXidInit(void)
* Record a committed read write serializable xid and the minimum
* commitSeqNo of any transactions to which this xid had a rw-conflict out.
* A zero seqNo means that there were no conflicts out from xid.
*
* The return value is normally false -- true means that we're about to
* wrap around our space for tracking these xids, so the caller might want
* to take action to prevent that.
*/
static void
OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
@ -733,7 +727,7 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
TransactionId tailXid;
int targetPage;
int slotno;
int page;
int firstZeroPage;
int xidSpread;
bool isNewPage;
@ -745,30 +739,34 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
/*
* If no serializable transactions are active, there shouldn't be anything
* to push out to this SLRU. Hitting this assert would mean there's
* to push out to the SLRU. Hitting this assert would mean there's
* something wrong with the earlier cleanup logic.
*/
tailXid = oldSerXidControl->tailXid;
Assert(TransactionIdIsValid(tailXid));
/*
* If the SLRU is currently unused, zero out the whole active region
* from tailXid to headXid before taking it into use. Otherwise zero
* out only any new pages that enter the tailXid-headXid range as we
* advance headXid.
*/
if (oldSerXidControl->headPage < 0)
{
page = OldSerXidPage(tailXid);
oldSerXidControl->tailSegment = OldSerXidSegment(page);
page = oldSerXidControl->tailSegment * OLDSERXID_ENTRIESPERPAGE;
firstZeroPage = OldSerXidPage(tailXid);
isNewPage = true;
}
else
{
page = OldSerXidNextPage(oldSerXidControl->headPage);
isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage);
firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
targetPage);
}
if (!TransactionIdIsValid(oldSerXidControl->headXid)
|| TransactionIdFollows(xid, oldSerXidControl->headXid))
oldSerXidControl->headXid = xid;
if (oldSerXidControl->headPage < 0
|| OldSerXidPagePrecedesLogically(oldSerXidControl->headPage, targetPage))
if (isNewPage)
oldSerXidControl->headPage = targetPage;
xidSpread = (((uint32) xid) - ((uint32) tailXid));
@ -788,10 +786,10 @@ OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
if (isNewPage)
{
/* Initialize intervening pages. */
while (page != targetPage)
while (firstZeroPage != targetPage)
{
(void) SimpleLruZeroPage(OldSerXidSlruCtl, page);
page = OldSerXidNextPage(page);
(void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
firstZeroPage = OldSerXidNextPage(firstZeroPage);
}
slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
}
@ -846,31 +844,24 @@ OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
/*
* Call this whenever there is a new xmin for active serializable
* transactions. We don't need to keep information on transactions which
* preceed that. InvalidTransactionId means none active, so everything in
* the SLRU should be discarded.
* precede that. InvalidTransactionId means none active, so everything in
* the SLRU can be discarded.
*/
static void
OldSerXidSetActiveSerXmin(TransactionId xid)
{
int newTailPage;
int newTailSegment;
LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
/*
* When no sxacts are active, nothing overlaps, set the xid values to
* invalid to show that there are no valid entries. Don't clear the
* segment/page information, though. A new xmin might still land in an
* existing segment, and we don't want to repeatedly delete and re-create
* the same segment file.
* invalid to show that there are no valid entries. Don't clear headPage,
* though. A new xmin might still land on that page, and we don't want
* to repeatedly zero out the same page.
*/
if (!TransactionIdIsValid(xid))
{
if (TransactionIdIsValid(oldSerXidControl->tailXid))
{
oldSerXidControl->headXid = InvalidTransactionId;
oldSerXidControl->tailXid = InvalidTransactionId;
}
oldSerXidControl->tailXid = InvalidTransactionId;
oldSerXidControl->headXid = InvalidTransactionId;
LWLockRelease(OldSerXidLock);
return;
}
@ -886,7 +877,9 @@ OldSerXidSetActiveSerXmin(TransactionId xid)
Assert(oldSerXidControl->headPage < 0);
if (!TransactionIdIsValid(oldSerXidControl->tailXid)
|| TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
{
oldSerXidControl->tailXid = xid;
}
LWLockRelease(OldSerXidLock);
return;
}
@ -896,37 +889,57 @@ OldSerXidSetActiveSerXmin(TransactionId xid)
oldSerXidControl->tailXid = xid;
/* Exit quickly if there are no segments active. */
LWLockRelease(OldSerXidLock);
}
/*
* Perform a checkpoint --- either during shutdown, or on-the-fly
*
* We don't have any data that needs to survive a restart, but this is a
* convenient place to truncate the SLRU.
*/
void
CheckPointPredicate(void)
{
int tailPage;
LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
/* Exit quickly if the SLRU is currently not in use. */
if (oldSerXidControl->headPage < 0)
{
LWLockRelease(OldSerXidLock);
return;
}
newTailPage = OldSerXidPage(xid);
newTailSegment = OldSerXidSegment(newTailPage);
/* Exit quickly if we're still on the same segment. */
if (newTailSegment == oldSerXidControl->tailSegment)
if (TransactionIdIsValid(oldSerXidControl->tailXid))
{
LWLockRelease(OldSerXidLock);
return;
/* We can truncate the SLRU up to the page containing tailXid */
tailPage = OldSerXidPage(oldSerXidControl->tailXid);
}
oldSerXidControl->tailSegment = newTailSegment;
/* See if that has cleared the last segment. */
if (OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
newTailSegment * SLRU_PAGES_PER_SEGMENT))
else
{
oldSerXidControl->headXid = InvalidTransactionId;
/*
* The SLRU is no longer needed. Truncate everything but the last
* page. We don't dare to touch the last page in case the SLRU is
* taken back to use, and the new tail falls on the same page.
*/
tailPage = oldSerXidControl->headPage;
oldSerXidControl->headPage = -1;
oldSerXidControl->tailSegment = -1;
}
LWLockRelease(OldSerXidLock);
SimpleLruTruncate(OldSerXidSlruCtl, newTailPage);
/*
* Flush dirty SLRU pages to disk
*
* This is not actually necessary from a correctness point of view. We do
* it merely as a debugging aid.
*/
SimpleLruFlush(OldSerXidSlruCtl, true);
/* Truncate away pages that are no longer required */
SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
}
/*------------------------------------------------------------------------*/

View File

@ -36,6 +36,8 @@ extern int max_predicate_locks_per_xact;
extern void InitPredicateLocks(void);
extern Size PredicateLockShmemSize(void);
extern void CheckPointPredicate(void);
/* predicate lock reporting */
extern bool PageIsPredicateLocked(const Relation relation, const BlockNumber blkno);