diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 918c300dba..4f82016c59 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -15,7 +15,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.7 2001/12/29 21:30:32 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.8 2002/01/07 16:33:00 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -30,6 +30,7 @@ typedef struct LWLock { slock_t mutex; /* Protects LWLock and queue of PROCs */ + bool releaseOK; /* T if ok to release waiters */ char exclusive; /* # of exclusive holders (0 or 1) */ int shared; /* # of shared holders (0..MaxBackends) */ PROC *head; /* head of list of waiting PROCs */ @@ -67,9 +68,10 @@ inline static void PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) { if (Trace_lwlocks) - elog(DEBUG, "%s(%d): excl %d shared %d head %p", + elog(DEBUG, "%s(%d): excl %d shared %d head %p rOK %d", where, (int) lockid, - (int) lock->exclusive, lock->shared, lock->head); + (int) lock->exclusive, lock->shared, lock->head, + (int) lock->releaseOK); } inline static void @@ -153,6 +155,7 @@ CreateLWLocks(void) for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) { SpinLockInit(&lock->mutex); + lock->releaseOK = true; lock->exclusive = 0; lock->shared = 0; lock->head = NULL; @@ -195,7 +198,9 @@ void LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = LWLockArray + lockid; - bool mustwait; + PROC *proc = MyProc; + bool retry = false; + int extraWaits = 0; PRINT_LWDEBUG("LWLockAcquire", lockid, lock); @@ -206,43 +211,62 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire_NoHoldoff(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) + /* + * Loop here to try to acquire lock after each time we are signaled + * by LWLockRelease. + * + * NOTE: it might seem better to have LWLockRelease actually grant us + * the lock, rather than retrying and possibly having to go back to + * sleep. But in practice that is no good because it means a process + * swap for every lock acquisition when two or more processes are + * contending for the same lock. Since LWLocks are normally used to + * protect not-very-long sections of computation, a process needs to + * be able to acquire and release the same lock many times during a + * single CPU time slice, even in the presence of contention. The + * efficiency of being able to do that outweighs the inefficiency of + * sometimes wasting a process dispatch cycle because the lock is not + * free when a released waiter finally gets to run. See pgsql-hackers + * archives for 29-Dec-01. + */ + for (;;) { - if (lock->exclusive == 0 && lock->shared == 0) + bool mustwait; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire_NoHoldoff(&lock->mutex); + + /* If retrying, allow LWLockRelease to release waiters again */ + if (retry) + lock->releaseOK = true; + + /* If I can get the lock, do so quickly. */ + if (mode == LW_EXCLUSIVE) { - lock->exclusive++; - mustwait = false; + if (lock->exclusive == 0 && lock->shared == 0) + { + lock->exclusive++; + mustwait = false; + } + else + mustwait = true; } else - mustwait = true; - } - else - { - /* - * If there is someone waiting (presumably for exclusive access), - * queue up behind him even though I could get the lock. This - * prevents a stream of read locks from starving a writer. - */ - if (lock->exclusive == 0 && lock->head == NULL) { - lock->shared++; - mustwait = false; + if (lock->exclusive == 0) + { + lock->shared++; + mustwait = false; + } + else + mustwait = true; } - else - mustwait = true; - } - if (mustwait) - { - /* Add myself to wait queue */ - PROC *proc = MyProc; - int extraWaits = 0; + if (!mustwait) + break; /* got the lock */ /* + * Add myself to wait queue. + * * If we don't have a PROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during * shared memory initialization. @@ -267,9 +291,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * * Since we share the process wait semaphore with the regular lock * manager and ProcWaitForSignal, and we may need to acquire an - * LWLock while one of those is pending, it is possible that we - * get awakened for a reason other than being granted the LWLock. - * If so, loop back and wait again. Once we've gotten the lock, + * LWLock while one of those is pending, it is possible that we get + * awakened for a reason other than being signaled by LWLockRelease. + * If so, loop back and wait again. Once we've gotten the LWLock, * re-increment the sema by the number of additional signals * received, so that the lock manager or signal manager will see * the received signal when it next waits. @@ -287,23 +311,22 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); - /* - * The awakener already updated the lock struct's state, so we - * don't need to do anything more to it. Just need to fix the - * semaphore count. - */ - while (extraWaits-- > 0) - IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum); - } - else - { - /* Got the lock without waiting */ - SpinLockRelease_NoHoldoff(&lock->mutex); + /* Now loop back and try to acquire lock again. */ + retry = true; } + /* We are done updating shared state of the lock itself. */ + SpinLockRelease_NoHoldoff(&lock->mutex); + /* Add lock to list of locks held by this backend */ Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS); held_lwlocks[num_held_lwlocks++] = lockid; + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum); } /* @@ -344,12 +367,7 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) } else { - /* - * If there is someone waiting (presumably for exclusive access), - * queue up behind him even though I could get the lock. This - * prevents a stream of read locks from starving a writer. - */ - if (lock->exclusive == 0 && lock->head == NULL) + if (lock->exclusive == 0) { lock->shared++; mustwait = false; @@ -419,33 +437,34 @@ LWLockRelease(LWLockId lockid) /* * See if I need to awaken any waiters. If I released a non-last - * shared hold, there cannot be anything to do. + * shared hold, there cannot be anything to do. Also, do not awaken + * any waiters if someone has already awakened waiters that haven't + * yet acquired the lock. */ head = lock->head; if (head != NULL) { - if (lock->exclusive == 0 && lock->shared == 0) + if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) { /* - * Remove the to-be-awakened PROCs from the queue, and update - * the lock state to show them as holding the lock. + * Remove the to-be-awakened PROCs from the queue. If the + * front waiter wants exclusive lock, awaken him only. + * Otherwise awaken as many waiters as want shared access. */ proc = head; - if (proc->lwExclusive) - lock->exclusive++; - else + if (!proc->lwExclusive) { - lock->shared++; while (proc->lwWaitLink != NULL && !proc->lwWaitLink->lwExclusive) { proc = proc->lwWaitLink; - lock->shared++; } } /* proc is now the last PROC to be released */ lock->head = proc->lwWaitLink; proc->lwWaitLink = NULL; + /* prevent additional wakeups until retryer gets to run */ + lock->releaseOK = false; } else {