Modify sinval so that InvalidateSharedInvalid() does not hold

the SInval spinlock while it is calling the passed invalFunction or
resetFunction.  This is necessary to avoid deadlock with lmgr change;
InvalidateSharedInvalid can be called recursively now.  It should be
a good performance improvement anyway --- holding a spinlock for more
than a very short interval is a no-no.
This commit is contained in:
Tom Lane 1999-09-04 18:36:45 +00:00
parent ae01c7f5bb
commit 8add6d71cf
3 changed files with 114 additions and 93 deletions

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.16 1999/07/15 22:39:49 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.17 1999/09/04 18:36:45 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -21,9 +21,9 @@
#include "storage/sinval.h" #include "storage/sinval.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
extern SISeg *shmInvalBuffer; /* the shared buffer segment, set by */ extern SISeg *shmInvalBuffer; /* the shared buffer segment, set by
* SISegmentAttach()
/* SISegmentAttach() */ */
extern BackendId MyBackendId; extern BackendId MyBackendId;
extern BackendTag MyBackendTag; extern BackendTag MyBackendTag;
@ -127,21 +127,20 @@ RegisterSharedInvalid(int cacheId, /* XXX */
ItemPointerSetInvalid(&newInvalid.pointerData); ItemPointerSetInvalid(&newInvalid.pointerData);
SpinAcquire(SInvalLock); SpinAcquire(SInvalLock);
if (!SISetDataEntry(shmInvalBuffer, &newInvalid)) while (!SISetDataEntry(shmInvalBuffer, &newInvalid))
{ {
/* buffer full */ /* buffer full */
/* release a message, mark process cache states to be invalid */ /* release a message, mark process cache states to be invalid */
SISetProcStateInvalid(shmInvalBuffer); SISetProcStateInvalid(shmInvalBuffer);
if (!SIDelDataEntry(shmInvalBuffer)) if (!SIDelDataEntries(shmInvalBuffer, 1))
{ {
/* inconsistent buffer state -- shd never happen */ /* inconsistent buffer state -- shd never happen */
SpinRelease(SInvalLock); SpinRelease(SInvalLock);
elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state"); elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state");
} }
/* write again */ /* loop around to try write again */
SISetDataEntry(shmInvalBuffer, &newInvalid);
} }
SpinRelease(SInvalLock); SpinRelease(SInvalLock);
} }
@ -157,13 +156,41 @@ RegisterSharedInvalid(int cacheId, /* XXX */
/* should be called by a backend */ /* should be called by a backend */
/****************************************************************************/ /****************************************************************************/
void void
InvalidateSharedInvalid(void (*invalFunction) (), InvalidateSharedInvalid(void (*invalFunction) (),
void (*resetFunction) ()) void (*resetFunction) ())
{ {
SpinAcquire(SInvalLock); SharedInvalidData data;
SIReadEntryData(shmInvalBuffer, MyBackendId, int getResult;
invalFunction, resetFunction); bool gotMessage = false;
SIDelExpiredDataEntries(shmInvalBuffer); for (;;)
SpinRelease(SInvalLock); {
SpinAcquire(SInvalLock);
getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
SpinRelease(SInvalLock);
if (getResult == 0)
break; /* nothing more to do */
if (getResult < 0)
{
/* got a reset message */
elog(NOTICE, "InvalidateSharedInvalid: cache state reset");
resetFunction();
}
else
{
/* got a normal data message */
invalFunction(data.cacheId,
data.hashIndex,
&data.pointerData);
}
gotMessage = true;
}
/* If we got any messages, try to release dead messages */
if (gotMessage)
{
SpinAcquire(SInvalLock);
SIDelExpiredDataEntries(shmInvalBuffer);
SpinRelease(SInvalLock);
}
} }

View File

@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.23 1999/07/17 20:17:44 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -450,20 +450,11 @@ SIGetLastDataEntry(SISeg *segP)
/************************************************************************/ /************************************************************************/
/* SIGetNextDataEntry(segP, offset) returns next data entry */ /* SIGetNextDataEntry(segP, offset) returns next data entry */
/************************************************************************/ /************************************************************************/
static SISegEntry * #define SIGetNextDataEntry(segP,offset) \
SIGetNextDataEntry(SISeg *segP, Offset offset) (((offset) == InvalidOffset) ? (SISegEntry *) NULL : \
{ (SISegEntry *) ((Pointer) (segP) + \
SISegEntry *eP; (segP)->startEntrySection + \
(Offset) (offset)))
if (offset == InvalidOffset)
return NULL;
eP = (SISegEntry *) ((Pointer) segP +
SIGetStartEntrySection(segP) +
offset);
return eP;
}
/************************************************************************/ /************************************************************************/
/* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */ /* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */
@ -566,31 +557,38 @@ SIDecProcLimit(SISeg *segP, int num)
/************************************************************************/ /************************************************************************/
/* SIDelDataEntry(segP) - free the FIRST entry */ /* SIDelDataEntries(segP, n) - free the FIRST n entries */
/************************************************************************/ /************************************************************************/
bool bool
SIDelDataEntry(SISeg *segP) SIDelDataEntries(SISeg *segP, int n)
{ {
SISegEntry *e1P; int i;
if (!SIDecNumEntries(segP, 1)) if (n <= 0)
return false;
if (!SIDecNumEntries(segP, n))
{ {
/* no entries in buffer */ /* not that many entries in buffer */
return false; return false;
} }
e1P = SIGetFirstDataEntry(segP); for (i = 1; i <= n; i++)
SISetStartEntryChain(segP, e1P->next);
if (SIGetStartEntryChain(segP) == InvalidOffset)
{ {
/* it was the last entry */ SISegEntry *e1P = SIGetFirstDataEntry(segP);
SISetEndEntryChain(segP, InvalidOffset); SISetStartEntryChain(segP, e1P->next);
if (SIGetStartEntryChain(segP) == InvalidOffset)
{
/* it was the last entry */
SISetEndEntryChain(segP, InvalidOffset);
}
/* free the entry */
e1P->isfree = true;
e1P->next = SIGetStartFreeSpace(segP);
SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
} }
/* free the entry */
e1P->isfree = true; SIDecProcLimit(segP, n);
e1P->next = SIGetStartFreeSpace(segP);
SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
SIDecProcLimit(segP, 1);
return true; return true;
} }
@ -621,51 +619,51 @@ SISetProcStateInvalid(SISeg *segP)
} }
/************************************************************************/ /************************************************************************/
/* SIReadEntryData(segP, backendId, function) */ /* SIGetDataEntry(segP, backendId, data) */
/* - marks messages to be read by id */ /* get next SI message for specified backend, if there is one */
/* and executes function */ /* */
/* Possible return values: */
/* 0: no SI message available */
/* 1: next SI message has been extracted into *data */
/* (there may be more messages available after this one!) */
/* -1: SI reset message extracted */
/************************************************************************/ /************************************************************************/
void int
SIReadEntryData(SISeg *segP, SIGetDataEntry(SISeg *segP, int backendId,
int backendId, SharedInvalidData *data)
void (*invalFunction) (),
void (*resetFunction) ())
{ {
int i = 0; SISegEntry *msg;
SISegEntry *data;
Assert(segP->procState[backendId - 1].tag == MyBackendTag); Assert(segP->procState[backendId - 1].tag == MyBackendTag);
if (!segP->procState[backendId - 1].resetState) if (segP->procState[backendId - 1].resetState)
{ {
/* invalidate data, but only those, you have not seen yet !! */
/* therefore skip read messages */
data = SIGetNthDataEntry(segP,
SIGetProcStateLimit(segP, backendId - 1) + 1);
while (data != NULL)
{
i++;
segP->procState[backendId - 1].limit++; /* one more message read */
invalFunction(data->entryData.cacheId,
data->entryData.hashIndex,
&data->entryData.pointerData);
data = SIGetNextDataEntry(segP, data->next);
}
/* SIDelExpiredDataEntries(segP); */
}
else
{
/* backend must not read messages, its own state has to be reset */
elog(NOTICE, "SIReadEntryData: cache state reset");
resetFunction(); /* XXXX call it here, parameters? */
/* new valid state--mark all messages "read" */ /* new valid state--mark all messages "read" */
segP->procState[backendId - 1].resetState = false; segP->procState[backendId - 1].resetState = false;
segP->procState[backendId - 1].limit = SIGetNumEntries(segP); segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
return -1;
} }
/* check whether we can remove dead messages */
if (i > MAXNUMMESSAGES) /* Get next message for this backend, if any */
elog(FATAL, "SIReadEntryData: Invalid segment state");
/* This is fairly inefficient if there are many messages,
* but normally there should not be...
*/
msg = SIGetNthDataEntry(segP,
SIGetProcStateLimit(segP, backendId - 1) + 1);
if (msg == NULL)
return 0; /* nothing to read */
*data = msg->entryData; /* return contents of message */
segP->procState[backendId - 1].limit++; /* one more message read */
/* There may be other backends that haven't read the message,
* so we cannot delete it here.
* SIDelExpiredDataEntries() should be called to remove dead messages.
*/
return 1; /* got a message */
} }
/************************************************************************/ /************************************************************************/
@ -688,15 +686,12 @@ SIDelExpiredDataEntries(SISeg *segP)
min = h; min = h;
} }
} }
if (min != 9999999) if (min < 9999999 && min > 0)
{ {
/* we can remove min messages */ /* we can remove min messages */
for (i = 1; i <= min; i++) /* this adjusts also the state limits! */
{ if (!SIDelDataEntries(segP, min))
/* this adjusts also the state limits! */ elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
if (!SIDelDataEntry(segP))
elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
}
} }
} }
@ -784,8 +779,7 @@ SISegmentAttach(IpcMemoryId shmid)
if (shmInvalBuffer == IpcMemAttachFailed) if (shmInvalBuffer == IpcMemAttachFailed)
{ {
/* XXX use validity function */ /* XXX use validity function */
elog(NOTICE, "SISegmentAttach: Could not attach segment"); elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
elog(FATAL, "SISegmentAttach: %m");
} }
} }

View File

@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: sinvaladt.h,v 1.16 1999/07/16 17:07:38 momjian Exp $ * $Id: sinvaladt.h,v 1.17 1999/09/04 18:36:44 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -128,9 +128,9 @@ extern int SISegmentInit(bool killExistingSegment, IPCKey key,
extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data); extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data);
extern void SISetProcStateInvalid(SISeg *segP); extern void SISetProcStateInvalid(SISeg *segP);
extern bool SIDelDataEntry(SISeg *segP); extern int SIGetDataEntry(SISeg *segP, int backendId,
extern void SIReadEntryData(SISeg *segP, int backendId, SharedInvalidData *data);
void (*invalFunction) (), void (*resetFunction) ()); extern bool SIDelDataEntries(SISeg *segP, int n);
extern void SIDelExpiredDataEntries(SISeg *segP); extern void SIDelExpiredDataEntries(SISeg *segP);
#endif /* SINVALADT_H */ #endif /* SINVALADT_H */