diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c index e993cef74a..c1a557033b 100644 --- a/src/backend/storage/ipc/sinval.c +++ b/src/backend/storage/ipc/sinval.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.17 1999/09/04 18:36:45 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.18 1999/09/06 19:37:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -21,12 +21,6 @@ #include "storage/sinval.h" #include "storage/sinvaladt.h" -extern SISeg *shmInvalBuffer; /* the shared buffer segment, set by - * SISegmentAttach() - */ -extern BackendId MyBackendId; -extern BackendTag MyBackendTag; - SPINLOCK SInvalLock = (SPINLOCK) NULL; /****************************************************************************/ @@ -39,11 +33,6 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends) { int status; - /* - * REMOVED SISyncKill(IPCKeyGetSIBufferMemorySemaphoreKey(key)); - * SISyncInit(IPCKeyGetSIBufferMemorySemaphoreKey(key)); - */ - /* SInvalLock gets set in spin.c, during spinlock init */ status = SISegmentInit(true, IPCKeyGetSIBufferMemoryBlock(key), maxBackends); @@ -53,9 +42,9 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends) } /****************************************************************************/ -/* AttachSharedInvalidationState(key) Attach a buffer segment */ +/* AttachSharedInvalidationState(key) Attach to existing buffer segment */ /* */ -/* should be called only by the POSTMASTER */ +/* should be called by each backend during startup */ /****************************************************************************/ void AttachSharedInvalidationState(IPCKey key) @@ -74,6 +63,11 @@ AttachSharedInvalidationState(IPCKey key) elog(FATAL, "AttachSharedInvalidationState: failed segment init"); } +/* + * InitSharedInvalidationState + * Initialize new backend's state info in buffer segment. + * Must be called after AttachSharedInvalidationState(). + */ void InitSharedInvalidationState(void) { @@ -88,24 +82,19 @@ InitSharedInvalidationState(void) /* * RegisterSharedInvalid - * Returns a new local cache invalidation state containing a new entry. + * Add a shared-cache-invalidation message to the global SI message queue. * * Note: * Assumes hash index is valid. * Assumes item pointer is valid. */ -/****************************************************************************/ -/* RegisterSharedInvalid(cacheId, hashIndex, pointer) */ -/* */ -/* register a message in the buffer */ -/* should be called by a backend */ -/****************************************************************************/ void RegisterSharedInvalid(int cacheId, /* XXX */ Index hashIndex, ItemPointer pointer) { - SharedInvalidData newInvalid; + SharedInvalidData newInvalid; + bool insertOK; /* * This code has been hacked to accept two types of messages. This @@ -127,34 +116,16 @@ RegisterSharedInvalid(int cacheId, /* XXX */ ItemPointerSetInvalid(&newInvalid.pointerData); SpinAcquire(SInvalLock); - while (!SISetDataEntry(shmInvalBuffer, &newInvalid)) - { - /* buffer full */ - /* release a message, mark process cache states to be invalid */ - SISetProcStateInvalid(shmInvalBuffer); - - if (!SIDelDataEntries(shmInvalBuffer, 1)) - { - /* inconsistent buffer state -- shd never happen */ - SpinRelease(SInvalLock); - elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state"); - } - - /* loop around to try write again */ - } + insertOK = SIInsertDataEntry(shmInvalBuffer, &newInvalid); SpinRelease(SInvalLock); + if (! insertOK) + elog(NOTICE, "RegisterSharedInvalid: SI buffer overflow"); } /* * InvalidateSharedInvalid - * Processes all entries in a shared cache invalidation state. + * Process shared-cache-invalidation messages waiting for this backend */ -/****************************************************************************/ -/* InvalidateSharedInvalid(invalFunction, resetFunction) */ -/* */ -/* invalidate a message in the buffer (read and clean up) */ -/* should be called by a backend */ -/****************************************************************************/ void InvalidateSharedInvalid(void (*invalFunction) (), void (*resetFunction) ()) diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 2e64d027f3..99426693cd 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.25 1999/09/06 19:37:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -16,648 +16,313 @@ #include "postgres.h" +#include "miscadmin.h" #include "storage/backendid.h" #include "storage/lmgr.h" +#include "storage/sinvaladt.h" #include "utils/trace.h" -/* ---------------- - * global variable notes - * - * SharedInvalidationSemaphore - * - * shmInvalBuffer - * the shared buffer segment, set by SISegmentAttach() - * - * MyBackendId - * might be removed later, used only for - * debugging in debug routines (end of file) - * - * SIDbId - * identification of buffer (disappears) - * - * SIRelId \ - * SIDummyOid \ identification of buffer - * SIXidData / - * SIXid / - * - * XXX This file really needs to be cleaned up. We switched to using - * spinlocks to protect critical sections (as opposed to using fake - * relations and going through the lock manager) and some of the old - * cruft was 'ifdef'ed out, while other parts (now unused) are still - * compiled into the system. -mer 5/24/92 - * ---------------- - */ -#ifdef HAS_TEST_AND_SET -int SharedInvalidationLockId; - -#else -IpcSemaphoreId SharedInvalidationSemaphore; - -#endif - SISeg *shmInvalBuffer; -extern BackendId MyBackendId; -static void CleanupInvalidationState(int status, SISeg *segInOutP); -static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag); -static int SIGetNumEntries(SISeg *segP); +static void SISegmentAttach(IpcMemoryId shmid); +static void SISegInit(SISeg *segP, int maxBackends); +static void CleanupInvalidationState(int status, SISeg *segP); +static void SISetProcStateInvalid(SISeg *segP); -/************************************************************************/ -/* SISetActiveProcess(segP, backendId) set the backend status active */ -/* should be called only by the postmaster when creating a backend */ -/************************************************************************/ -/* XXX I suspect that the segP parameter is extraneous. -hirohama */ -static void -SISetActiveProcess(SISeg *segInOutP, BackendId backendId) +/* + * SISegmentInit + * Create a new SI memory segment, or attach to an existing one + * + * This is called with createNewSegment = true by the postmaster (or by + * a standalone backend), and subsequently with createNewSegment = false + * by backends started by the postmaster. + * + * Note: maxBackends param is only valid when createNewSegment is true + */ +int +SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends) { - /* mark all messages as read */ + int segSize; + IpcMemoryId shmId; - /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */ + if (createNewSegment) + { + /* Kill existing segment, if any */ + IpcMemoryKill(key); - segInOutP->procState[backendId - 1].resetState = false; - segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP); + /* Figure space needed. + * Note sizeof(SISeg) includes the first ProcState entry. + */ + segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); + + /* Get a shared segment */ + shmId = IpcMemoryCreate(key, segSize, IPCProtection); + if (shmId < 0) + { + perror("SISegmentInit: segment create failed"); + return -1; /* an error */ + } + + /* Attach to the shared cache invalidation segment */ + /* sets the global variable shmInvalBuffer */ + SISegmentAttach(shmId); + + /* Init shared memory contents */ + SISegInit(shmInvalBuffer, maxBackends); + } + else + { + /* find existing segment */ + shmId = IpcMemoryIdGet(key, 0); + if (shmId < 0) + { + perror("SISegmentInit: segment get failed"); + return -1; /* an error */ + } + + /* Attach to the shared cache invalidation segment */ + /* sets the global variable shmInvalBuffer */ + SISegmentAttach(shmId); + } + return 1; } -/****************************************************************************/ -/* SIBackendInit() initializes a backend to operate on the buffer */ -/****************************************************************************/ -int -SIBackendInit(SISeg *segInOutP) +/* + * SISegmentAttach + * Attach to specified shared memory segment + */ +static void +SISegmentAttach(IpcMemoryId shmid) { - LockRelId LtCreateRelId(); - TransactionId LMITransactionIdCopy(); + shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid); + + if (shmInvalBuffer == IpcMemAttachFailed) + { + /* XXX use validity function */ + elog(FATAL, "SISegmentAttach: Could not attach segment: %m"); + } +} + +/* + * SISegInit + * Initialize contents of a new shared memory sinval segment + */ +static void +SISegInit(SISeg *segP, int maxBackends) +{ + int i; + + /* Clear message counters, save size of procState array */ + segP->minMsgNum = 0; + segP->maxMsgNum = 0; + segP->maxBackends = maxBackends; + + /* The buffer[] array is initially all unused, so we need not fill it */ + + /* Mark all backends inactive */ + for (i = 0; i < maxBackends; i++) + { + segP->procState[i].nextMsgNum = -1; /* inactive */ + segP->procState[i].resetState = false; + segP->procState[i].tag = InvalidBackendTag; + } +} + +/* + * SIBackendInit + * Initialize a new backend to operate on the sinval buffer + * + * NB: this routine, and all following ones, must be executed with the + * SInvalLock spinlock held, since there may be multiple backends trying + * to access the buffer. + */ +int +SIBackendInit(SISeg *segP) +{ + Index index; + ProcState *stateP = NULL; Assert(MyBackendTag > 0); - MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag); - if (MyBackendId == InvalidBackendTag) + /* Check for duplicate backend tags (should never happen) */ + for (index = 0; index < segP->maxBackends; index++) + { + if (segP->procState[index].tag == MyBackendTag) + elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag); + } + + /* Look for a free entry in the procState array */ + for (index = 0; index < segP->maxBackends; index++) + { + if (segP->procState[index].tag == InvalidBackendTag) + { + stateP = &segP->procState[index]; + break; + } + } + + /* elog() with spinlock held is probably not too cool, but these + * conditions should never happen anyway. + */ + if (stateP == NULL) + { + elog(NOTICE, "SIBackendInit: no free procState slot available"); + MyBackendId = InvalidBackendTag; return 0; + } + + MyBackendId = (stateP - &segP->procState[0]) + 1; #ifdef INVALIDDEBUG elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.", MyBackendTag, MyBackendId); #endif /* INVALIDDEBUG */ - SISetActiveProcess(segInOutP, MyBackendId); - on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP); + /* mark myself active, with all extant messages already read */ + stateP->tag = MyBackendTag; + stateP->resetState = false; + stateP->nextMsgNum = segP->maxMsgNum; + + /* register exit routine to mark my entry inactive at exit */ + on_shmem_exit(CleanupInvalidationState, (caddr_t) segP); + return 1; } -/* ---------------- - * SIAssignBackendId - * ---------------- - */ -static BackendId -SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag) -{ - Index index; - ProcState *stateP = NULL; - - for (index = 0; index < segInOutP->maxBackends; index++) - { - if (segInOutP->procState[index].tag == InvalidBackendTag || - segInOutP->procState[index].tag == backendTag) - { - stateP = &segInOutP->procState[index]; - break; - } - - if (!PointerIsValid(stateP) || - (segInOutP->procState[index].resetState && - (!stateP->resetState || - stateP->tag < backendTag)) || - (!stateP->resetState && - (segInOutP->procState[index].limit < - stateP->limit || - stateP->tag < backendTag))) - stateP = &segInOutP->procState[index]; - } - - /* verify that all "procState" entries checked for matching tags */ - - for (index++; index < segInOutP->maxBackends; index++) - { - if (segInOutP->procState[index].tag == backendTag) - elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag); - } - - Assert(stateP); - - if (stateP->tag != InvalidBackendTag) - { - if (stateP->tag == backendTag) - elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag); - else - { - elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag); - return InvalidBackendTag; - } - } - - stateP->tag = backendTag; - - return 1 + stateP - &segInOutP->procState[0]; -} - - -/************************************************************************/ -/* The following function should be called only by the postmaster !! */ -/************************************************************************/ - -/************************************************************************/ -/* SISetDeadProcess(segP, backendId) set the backend status DEAD */ -/* should be called only by the postmaster when a backend died */ -/************************************************************************/ -static void -SISetDeadProcess(SISeg *segP, int backendId) -{ - /* XXX call me.... */ - - segP->procState[backendId - 1].resetState = false; - segP->procState[backendId - 1].limit = -1; - segP->procState[backendId - 1].tag = InvalidBackendTag; -} - /* * CleanupInvalidationState - * Note: - * This is a temporary hack. ExitBackend should call this instead - * of exit (via on_shmem_exit). + * Mark the current backend as no longer active. + * + * This function is called via on_shmem_exit() during backend shutdown. */ static void -CleanupInvalidationState(int status, /* XXX */ - SISeg *segInOutP) /* XXX style */ +CleanupInvalidationState(int status, + SISeg *segP) { - Assert(PointerIsValid(segInOutP)); + Assert(PointerIsValid(segP)); - SISetDeadProcess(segInOutP, MyBackendId); + /* XXX we probably oughta grab the SInval spinlock for this... + * but I think it is safe not to. + */ + + segP->procState[MyBackendId - 1].nextMsgNum = -1; + segP->procState[MyBackendId - 1].resetState = false; + segP->procState[MyBackendId - 1].tag = InvalidBackendTag; } - -/************************************************************************/ -/* SIComputeSize() - compute size and offsets for SI segment */ -/************************************************************************/ -static void -SIComputeSize(SISegOffsets *oP, int maxBackends) +/* + * SIInsertDataEntry + * Add a new invalidation message to the buffer. + * + * If we are unable to insert the message because the buffer is full, + * then clear the buffer and assert the "reset" flag to each backend. + * This will cause all the backends to discard *all* invalidatable state. + * + * Returns true for normal successful insertion, false if had to reset. + */ +bool +SIInsertDataEntry(SISeg *segP, SharedInvalidData *data) { - int A, - B, - a, - b, - totalSize; + int numMsgs = segP->maxMsgNum - segP->minMsgNum; - A = 0; - /* sizeof(SISeg) includes the first ProcState entry */ - a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); - a = MAXALIGN(a); /* offset to first data entry */ - b = sizeof(SISegEntry) * MAXNUMMESSAGES; - B = A + a + b; - B = MAXALIGN(B); - totalSize = B - A; - - oP->startSegment = A; - oP->offsetToFirstEntry = a; /* relative to A */ - oP->offsetToEndOfSegment = totalSize; /* relative to A */ -} - - -/************************************************************************/ -/* SISetStartEntrySection(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartEntrySection(SISeg *segP, Offset offset) -{ - segP->startEntrySection = offset; -} - -/************************************************************************/ -/* SIGetStartEntrySection(segP) - returnss the offset */ -/************************************************************************/ -static Offset -SIGetStartEntrySection(SISeg *segP) -{ - return segP->startEntrySection; -} - - -/************************************************************************/ -/* SISetEndEntrySection(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetEndEntrySection(SISeg *segP, Offset offset) -{ - segP->endEntrySection = offset; -} - -/************************************************************************/ -/* SISetEndEntryChain(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetEndEntryChain(SISeg *segP, Offset offset) -{ - segP->endEntryChain = offset; -} - -/************************************************************************/ -/* SIGetEndEntryChain(segP) - returnss the offset */ -/************************************************************************/ -static Offset -SIGetEndEntryChain(SISeg *segP) -{ - return segP->endEntryChain; -} - -/************************************************************************/ -/* SISetStartEntryChain(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartEntryChain(SISeg *segP, Offset offset) -{ - segP->startEntryChain = offset; -} - -/************************************************************************/ -/* SIGetStartEntryChain(segP) - returns the offset */ -/************************************************************************/ -static Offset -SIGetStartEntryChain(SISeg *segP) -{ - return segP->startEntryChain; -} - -/************************************************************************/ -/* SISetNumEntries(segP, num) sets the current nuber of entries */ -/************************************************************************/ -static bool -SISetNumEntries(SISeg *segP, int num) -{ - if (num <= MAXNUMMESSAGES) + /* Is the buffer full? */ + if (numMsgs >= MAXNUMMESSAGES) { - segP->numEntries = num; - return true; + /* Yes, so force reset */ + SISetProcStateInvalid(segP); + return false; } - else - { - return false; /* table full */ - } -} - -/************************************************************************/ -/* SIGetNumEntries(segP) - returns the current nuber of entries */ -/************************************************************************/ -static int -SIGetNumEntries(SISeg *segP) -{ - return segP->numEntries; -} - - -/************************************************************************/ -/* SISetMaxNumEntries(segP, num) sets the maximal number of entries */ -/************************************************************************/ -static bool -SISetMaxNumEntries(SISeg *segP, int num) -{ - if (num <= MAXNUMMESSAGES) - { - segP->maxNumEntries = num; - return true; - } - else - { - return false; /* wrong number */ - } -} - - -/************************************************************************/ -/* SIGetProcStateLimit(segP, i) returns the limit of read messages */ -/************************************************************************/ - -#define SIGetProcStateLimit(segP,i) \ - ((segP)->procState[i].limit) - -/************************************************************************/ -/* SIIncNumEntries(segP, num) increments the current nuber of entries */ -/************************************************************************/ -static bool -SIIncNumEntries(SISeg *segP, int num) -{ /* - * Try to prevent table overflow. When the table is 70% full send a - * SIGUSR2 to the postmaster which will send it back to all the - * backends. This will be handled by Async_NotifyHandler() with a - * StartTransactionCommand() which will flush unread SI entries for - * each backend. dz - 27 Jan 1998 + * Try to prevent table overflow. When the table is 70% full send a + * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will + * send it back to all the backends. This will force idle backends to + * execute a transaction to look through pg_listener for NOTIFY messages, + * and as a byproduct of the transaction start they will read SI entries. + * + * This should never happen if all the backends are actively executing + * queries, but if a backend is sitting idle then it won't be starting + * transactions and so won't be reading SI entries. + * + * dz - 27 Jan 1998 */ - if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100)) + if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && + IsUnderPostmaster) { TPRINTF(TRACE_VERBOSE, - "SIIncNumEntries: table is 70%% full, signaling postmaster"); + "SIInsertDataEntry: table is 70%% full, signaling postmaster"); kill(getppid(), SIGUSR2); } - if ((segP->numEntries + num) <= MAXNUMMESSAGES) - { - segP->numEntries = segP->numEntries + num; - return true; - } - else - { - return false; /* table full */ - } -} + /* + * Insert new message into proper slot of circular buffer + */ + segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; + segP->maxMsgNum++; -/************************************************************************/ -/* SIDecNumEntries(segP, num) decrements the current nuber of entries */ -/************************************************************************/ -static bool -SIDecNumEntries(SISeg *segP, int num) -{ - if ((segP->numEntries - num) >= 0) - { - segP->numEntries = segP->numEntries - num; - return true; - } - else - { - return false; /* not enough entries in table */ - } -} - -/************************************************************************/ -/* SISetStartFreeSpace(segP, offset) - sets the offset */ -/************************************************************************/ -static void -SISetStartFreeSpace(SISeg *segP, Offset offset) -{ - segP->startFreeSpace = offset; -} - -/************************************************************************/ -/* SIGetStartFreeSpace(segP) - returns the offset */ -/************************************************************************/ -static Offset -SIGetStartFreeSpace(SISeg *segP) -{ - return segP->startFreeSpace; -} - - - -/************************************************************************/ -/* SIGetFirstDataEntry(segP) returns first data entry */ -/************************************************************************/ -static SISegEntry * -SIGetFirstDataEntry(SISeg *segP) -{ - SISegEntry *eP; - Offset startChain; - - startChain = SIGetStartEntryChain(segP); - - if (startChain == InvalidOffset) - return NULL; - - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - startChain); - return eP; -} - - -/************************************************************************/ -/* SIGetLastDataEntry(segP) returns last data entry in the chain */ -/************************************************************************/ -static SISegEntry * -SIGetLastDataEntry(SISeg *segP) -{ - SISegEntry *eP; - Offset endChain; - - endChain = SIGetEndEntryChain(segP); - - if (endChain == InvalidOffset) - return NULL; - - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - endChain); - return eP; -} - -/************************************************************************/ -/* SIGetNextDataEntry(segP, offset) returns next data entry */ -/************************************************************************/ -#define SIGetNextDataEntry(segP,offset) \ - (((offset) == InvalidOffset) ? (SISegEntry *) NULL : \ - (SISegEntry *) ((Pointer) (segP) + \ - (segP)->startEntrySection + \ - (Offset) (offset))) - -/************************************************************************/ -/* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain */ -/************************************************************************/ -static SISegEntry * -SIGetNthDataEntry(SISeg *segP, - int n) /* must range from 1 to MaxMessages */ -{ - SISegEntry *eP; - int i; - - if (n <= 0) - return NULL; - - eP = SIGetFirstDataEntry(segP); - for (i = 1; i < n; i++) - { - /* skip one and get the next */ - eP = SIGetNextDataEntry(segP, eP->next); - } - - return eP; -} - -/************************************************************************/ -/* SIEntryOffset(segP, entryP) returns the offset for an pointer */ -/************************************************************************/ -static Offset -SIEntryOffset(SISeg *segP, SISegEntry *entryP) -{ - /* relative to B !! */ - return ((Offset) ((Pointer) entryP - - (Pointer) segP - - SIGetStartEntrySection(segP))); -} - - -/************************************************************************/ -/* SISetDataEntry(segP, data) - sets a message in the segemnt */ -/************************************************************************/ -bool -SISetDataEntry(SISeg *segP, SharedInvalidData *data) -{ - Offset offsetToNewData; - SISegEntry *eP, - *lastP; - - if (!SIIncNumEntries(segP, 1)) - return false; /* no space */ - - /* get a free entry */ - offsetToNewData = SIGetStartFreeSpace(segP); - eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */ - SISetStartFreeSpace(segP, eP->next); - /* fill it up */ - eP->entryData = *data; - eP->isfree = false; - eP->next = InvalidOffset; - - /* handle insertion point at the end of the chain !! */ - lastP = SIGetLastDataEntry(segP); - if (lastP == NULL) - { - /* there is no chain, insert the first entry */ - SISetStartEntryChain(segP, SIEntryOffset(segP, eP)); - } - else - { - /* there is a last entry in the chain */ - lastP->next = SIEntryOffset(segP, eP); - } - SISetEndEntryChain(segP, SIEntryOffset(segP, eP)); return true; } - -/************************************************************************/ -/* SIDecProcLimit(segP, num) decrements all process limits */ -/************************************************************************/ +/* + * SISetProcStateInvalid + * Flush pending messages from buffer, assert reset flag for each backend + * + * This is used only to recover from SI buffer overflow. + */ static void -SIDecProcLimit(SISeg *segP, int num) -{ - int i; - - for (i = 0; i < segP->maxBackends; i++) - { - /* decrement only, if there is a limit > 0 */ - if (segP->procState[i].limit > 0) - { - segP->procState[i].limit = segP->procState[i].limit - num; - if (segP->procState[i].limit < 0) - { - /* limit was not high enough, reset to zero */ - /* negative means it's a dead backend */ - segP->procState[i].limit = 0; - } - } - } -} - - -/************************************************************************/ -/* SIDelDataEntries(segP, n) - free the FIRST n entries */ -/************************************************************************/ -bool -SIDelDataEntries(SISeg *segP, int n) -{ - int i; - - if (n <= 0) - return false; - - if (!SIDecNumEntries(segP, n)) - { - /* not that many entries in buffer */ - return false; - } - - for (i = 1; i <= n; i++) - { - SISegEntry *e1P = SIGetFirstDataEntry(segP); - SISetStartEntryChain(segP, e1P->next); - if (SIGetStartEntryChain(segP) == InvalidOffset) - { - /* it was the last entry */ - SISetEndEntryChain(segP, InvalidOffset); - } - /* free the entry */ - e1P->isfree = true; - e1P->next = SIGetStartFreeSpace(segP); - SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P)); - } - - SIDecProcLimit(segP, n); - return true; -} - - - -/************************************************************************/ -/* SISetProcStateInvalid(segP) checks and marks a backends state as */ -/* invalid */ -/************************************************************************/ -void SISetProcStateInvalid(SISeg *segP) { int i; + segP->minMsgNum = 0; + segP->maxMsgNum = 0; + for (i = 0; i < segP->maxBackends; i++) { - if (segP->procState[i].limit == 0) + if (segP->procState[i].nextMsgNum >= 0) /* active backend? */ { - /* backend i didn't read any message */ segP->procState[i].resetState = true; - - /* - * XXX signal backend that it has to reset its internal cache - * ? - */ + segP->procState[i].nextMsgNum = 0; } } } -/************************************************************************/ -/* SIGetDataEntry(segP, backendId, data) */ -/* get next SI message for specified backend, if there is one */ -/* */ -/* Possible return values: */ -/* 0: no SI message available */ -/* 1: next SI message has been extracted into *data */ -/* (there may be more messages available after this one!) */ -/* -1: SI reset message extracted */ -/************************************************************************/ +/* + * SIGetDataEntry + * get next SI message for specified backend, if there is one + * + * Possible return values: + * 0: no SI message available + * 1: next SI message has been extracted into *data + * (there may be more messages available after this one!) + * -1: SI reset message extracted + */ int SIGetDataEntry(SISeg *segP, int backendId, SharedInvalidData *data) { - SISegEntry *msg; + ProcState *stateP = & segP->procState[backendId - 1]; - Assert(segP->procState[backendId - 1].tag == MyBackendTag); + Assert(stateP->tag == MyBackendTag); - if (segP->procState[backendId - 1].resetState) + if (stateP->resetState) { - /* new valid state--mark all messages "read" */ - segP->procState[backendId - 1].resetState = false; - segP->procState[backendId - 1].limit = SIGetNumEntries(segP); + /* Force reset. We can say we have dealt with any messages added + * since the reset, as well... + */ + stateP->resetState = false; + stateP->nextMsgNum = segP->maxMsgNum; return -1; } - /* Get next message for this backend, if any */ - - /* This is fairly inefficient if there are many messages, - * but normally there should not be... - */ - msg = SIGetNthDataEntry(segP, - SIGetProcStateLimit(segP, backendId - 1) + 1); - - if (msg == NULL) + if (stateP->nextMsgNum >= segP->maxMsgNum) return 0; /* nothing to read */ - *data = msg->entryData; /* return contents of message */ - - segP->procState[backendId - 1].limit++; /* one more message read */ + /* + * Retrieve message and advance my counter. + */ + *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; + stateP->nextMsgNum++; /* There may be other backends that haven't read the message, * so we cannot delete it here. @@ -666,9 +331,10 @@ SIGetDataEntry(SISeg *segP, int backendId, return 1; /* got a message */ } -/************************************************************************/ -/* SIDelExpiredDataEntries (segP) - removes irrelevant messages */ -/************************************************************************/ +/* + * SIDelExpiredDataEntries + * Remove messages that have been consumed by all active backends + */ void SIDelExpiredDataEntries(SISeg *segP) { @@ -676,161 +342,34 @@ SIDelExpiredDataEntries(SISeg *segP) i, h; - min = 9999999; + min = segP->maxMsgNum; + if (min == segP->minMsgNum) + return; /* fast path if no messages exist */ + + /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ + for (i = 0; i < segP->maxBackends; i++) { - h = SIGetProcStateLimit(segP, i); + h = segP->procState[i].nextMsgNum; if (h >= 0) { /* backend active */ if (h < min) min = h; } } - if (min < 9999999 && min > 0) + segP->minMsgNum = min; + + /* When minMsgNum gets really large, decrement all message counters + * so as to forestall overflow of the counters. + */ + if (min >= MSGNUMWRAPAROUND) { - /* we can remove min messages */ - /* this adjusts also the state limits! */ - if (!SIDelDataEntries(segP, min)) - elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state"); - } -} - - - -/************************************************************************/ -/* SISegInit(segP) - initializes the segment */ -/************************************************************************/ -static void -SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends) -{ - int i; - SISegEntry *eP; - - /* set semaphore ids in the segment */ - /* XXX */ - SISetStartEntrySection(segP, oP->offsetToFirstEntry); - SISetEndEntrySection(segP, oP->offsetToEndOfSegment); - SISetStartFreeSpace(segP, 0); - SISetStartEntryChain(segP, InvalidOffset); - SISetEndEntryChain(segP, InvalidOffset); - SISetNumEntries(segP, 0); - SISetMaxNumEntries(segP, MAXNUMMESSAGES); - segP->maxBackends = maxBackends; - for (i = 0; i < segP->maxBackends; i++) - { - segP->procState[i].limit = -1; /* no backend active !! */ - segP->procState[i].resetState = false; - segP->procState[i].tag = InvalidBackendTag; - } - /* construct a chain of free entries */ - for (i = 1; i < MAXNUMMESSAGES; i++) - { - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - (i - 1) * sizeof(SISegEntry)); - eP->isfree = true; - eP->next = i * sizeof(SISegEntry); /* relative to B */ - } - /* handle the last free entry separate */ - eP = (SISegEntry *) ((Pointer) segP + - SIGetStartEntrySection(segP) + - (MAXNUMMESSAGES - 1) * sizeof(SISegEntry)); - eP->isfree = true; - eP->next = InvalidOffset; /* it's the end of the chain !! */ -} - - - -/************************************************************************/ -/* SISegmentKill(key) - kill any segment */ -/************************************************************************/ -static void -SISegmentKill(int key) /* the corresponding key for the segment */ -{ - IpcMemoryKill(key); -} - - -/************************************************************************/ -/* SISegmentGet(key, size) - get a shared segment of size */ -/* returns a segment id */ -/************************************************************************/ -static IpcMemoryId -SISegmentGet(int key, /* the corresponding key for the segment */ - int size, /* size of segment in bytes */ - bool create) -{ - IpcMemoryId shmid; - - if (create) - shmid = IpcMemoryCreate(key, size, IPCProtection); - else - shmid = IpcMemoryIdGet(key, size); - return shmid; -} - -/************************************************************************/ -/* SISegmentAttach(shmid) - attach a shared segment with id shmid */ -/************************************************************************/ -static void -SISegmentAttach(IpcMemoryId shmid) -{ - shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid); - if (shmInvalBuffer == IpcMemAttachFailed) - { - /* XXX use validity function */ - elog(FATAL, "SISegmentAttach: Could not attach segment: %m"); - } -} - - -/************************************************************************/ -/* SISegmentInit() initialize SI segment */ -/* */ -/* NB: maxBackends param is only valid when killExistingSegment is true */ -/************************************************************************/ -int -SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends) -{ - SISegOffsets offsets; - IpcMemoryId shmId; - bool create; - - if (killExistingSegment) - { - /* Kill existing segment */ - /* set semaphore */ - SISegmentKill(key); - - /* Get a shared segment */ - SIComputeSize(&offsets, maxBackends); - create = true; - shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create); - if (shmId < 0) + segP->minMsgNum -= MSGNUMWRAPAROUND; + segP->maxMsgNum -= MSGNUMWRAPAROUND; + for (i = 0; i < segP->maxBackends; i++) { - perror("SISegmentGet: failed"); - return -1; /* an error */ + if (segP->procState[i].nextMsgNum >= 0) + segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } - - /* Attach the shared cache invalidation segment */ - /* sets the global variable shmInvalBuffer */ - SISegmentAttach(shmId); - - /* Init shared memory table */ - SISegInit(shmInvalBuffer, &offsets, maxBackends); } - else - { - /* use an existing segment */ - create = false; - shmId = SISegmentGet(key, 0, create); - if (shmId < 0) - { - perror("SISegmentGet: getting an existent segment failed"); - return -1; /* an error */ - } - /* Attach the shared cache invalidation segment */ - SISegmentAttach(shmId); - } - return 1; } diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 87b8538212..8f0f834e0f 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -6,16 +6,16 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: lock.h,v 1.33 1999/07/16 17:07:38 momjian Exp $ + * $Id: lock.h,v 1.34 1999/09/06 19:37:37 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef LOCK_H_ #define LOCK_H_ +#include "storage/ipc.h" #include "storage/itemptr.h" #include "storage/shmem.h" -#include "storage/sinvaladt.h" #include "utils/array.h" extern SPINLOCK LockMgrLock; diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h index e008e52d30..b9d349a4c5 100644 --- a/src/include/storage/sinvaladt.h +++ b/src/include/storage/sinvaladt.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: sinvaladt.h,v 1.17 1999/09/04 18:36:44 tgl Exp $ + * $Id: sinvaladt.h,v 1.18 1999/09/06 19:37:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,72 +17,50 @@ #include "storage/itemptr.h" /* - * The structure of the shared cache invaidation segment + * The shared cache invalidation manager is responsible for transmitting + * invalidation messages between backends. Any message sent by any backend + * must be delivered to all already-running backends before it can be + * forgotten. * + * Conceptually, the messages are stored in an infinite array, where + * maxMsgNum is the next array subscript to store a submitted message in, + * minMsgNum is the smallest array subscript containing a message not yet + * read by all backends, and we always have maxMsgNum >= minMsgNum. (They + * are equal when there are no messages pending.) For each active backend, + * there is a nextMsgNum pointer indicating the next message it needs to read; + * we have maxMsgNum >= nextMsgNum >= minMsgNum for every backend. + * + * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES + * entries. We translate MsgNum values into circular-buffer indexes by + * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as + * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum + * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space + * in the buffer. If the buffer does overflow, we reset it to empty and + * force each backend to "reset", ie, discard all its invalidatable state. + * + * We would have problems if the MsgNum values overflow an integer, so + * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND + * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be + * large so that we don't need to do this often. It must be a multiple of + * MAXNUMMESSAGES so that the existing circular-buffer entries don't need + * to be moved when we do it. */ + + /* -A------------- Header info -------------- - criticalSectionSemaphoreId - generalSemaphoreId - startEntrySection (offset a) - endEntrySection (offset a + b) - startFreeSpace (offset relative to B) - startEntryChain (offset relatiev to B) - endEntryChain (offset relative to B) - numEntries - maxNumEntries - maxBackends - procState[maxBackends] --> limit - resetState (bool) -a tag (POSTID) -B------------- Start entry section ------- - SISegEntry --> entryData --> ... (see SharedInvalidData!) - isfree (bool) - next (offset to next entry in chain ) -b .... (dynamically growing down) -C----------------End shared segment ------- + * Configurable parameters. + * + * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. + * Must be a power of 2 for speed. + * + * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. + * Must be a multiple of MAXNUMMESSAGES. Should be large. + */ -*/ - -/* Parameters (configurable) *******************************************/ -#define MAXNUMMESSAGES 4000 /* maximum number of messages in seg */ - - -#define InvalidOffset 1000000000 /* a invalid offset (End of - * chain) */ - -typedef struct ProcState -{ - int limit; /* the number of read messages */ - bool resetState; /* true, if backend has to reset its state */ - int tag; /* special tag, recieved from the - * postmaster */ -} ProcState; - - -typedef struct SISeg -{ - IpcSemaphoreId criticalSectionSemaphoreId; /* semaphore id */ - IpcSemaphoreId generalSemaphoreId; /* semaphore id */ - Offset startEntrySection; /* (offset a) */ - Offset endEntrySection;/* (offset a + b) */ - Offset startFreeSpace; /* (offset relative to B) */ - Offset startEntryChain;/* (offset relative to B) */ - Offset endEntryChain; /* (offset relative to B) */ - int numEntries; - int maxNumEntries; - int maxBackends; /* size of procState array */ - /* - * We declare procState as 1 entry because C wants a fixed-size array, - * but actually it is maxBackends entries long. - */ - ProcState procState[1]; /* reflects the invalidation state */ - /* - * The entry section begins after the end of the procState array. - * Everything there is controlled by offsets. - */ -} SISeg; +#define MAXNUMMESSAGES 4096 +#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096) +/* The content of one shared-invalidation message */ typedef struct SharedInvalidData { int cacheId; /* XXX */ @@ -92,45 +70,53 @@ typedef struct SharedInvalidData typedef SharedInvalidData *SharedInvalid; - -typedef struct SISegEntry +/* Per-backend state in shared invalidation structure */ +typedef struct ProcState { - SharedInvalidData entryData;/* the message data */ - bool isfree; /* entry free? */ - Offset next; /* offset to next entry */ -} SISegEntry; + /* nextMsgNum is -1 in an inactive ProcState array entry. */ + int nextMsgNum; /* next message number to read, or -1 */ + bool resetState; /* true, if backend has to reset its state */ + int tag; /* backend tag received from postmaster */ +} ProcState; -typedef struct SISegOffsets +/* Shared cache invalidation memory segment */ +typedef struct SISeg { - Offset startSegment; /* always 0 (for now) */ - Offset offsetToFirstEntry; /* A + a = B */ - Offset offsetToEndOfSegment; /* A + a + b */ -} SISegOffsets; + /* + * General state information + */ + int minMsgNum; /* oldest message still needed */ + int maxMsgNum; /* next message number to be assigned */ + int maxBackends; /* size of procState array */ + /* + * Circular buffer holding shared-inval messages + */ + SharedInvalidData buffer[MAXNUMMESSAGES]; + /* + * Per-backend state info. + * + * We declare procState as 1 entry because C wants a fixed-size array, + * but actually it is maxBackends entries long. + */ + ProcState procState[1]; /* reflects the invalidation state */ +} SISeg; -/****************************************************************************/ -/* synchronization of the shared buffer access */ -/* access to the buffer is synchronized by the lock manager !! */ -/****************************************************************************/ +extern SISeg *shmInvalBuffer; /* pointer to the shared buffer segment, + * set by SISegmentAttach() + */ -#define SI_LockStartValue 255 -#define SI_SharedLock (-1) -#define SI_ExclusiveLock (-255) - -extern SISeg *shmInvalBuffer; /* * prototypes for functions in sinvaladt.c */ -extern int SIBackendInit(SISeg *segInOutP); -extern int SISegmentInit(bool killExistingSegment, IPCKey key, +extern int SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends); +extern int SIBackendInit(SISeg *segP); -extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data); -extern void SISetProcStateInvalid(SISeg *segP); +extern bool SIInsertDataEntry(SISeg *segP, SharedInvalidData *data); extern int SIGetDataEntry(SISeg *segP, int backendId, SharedInvalidData *data); -extern bool SIDelDataEntries(SISeg *segP, int n); extern void SIDelExpiredDataEntries(SISeg *segP); #endif /* SINVALADT_H */