Factor out lock cleanup code that is needed in several places in lock.c.

Also, remove the rather useless return value of LockReleaseAll.  Change
response to detection of corruption in the shared lock tables to PANIC,
since that is the only way of cleaning up fully.
Originally an idea of Heikki Linnakangas, variously hacked on by
Alvaro Herrera and Tom Lane.
This commit is contained in:
Tom Lane 2005-05-19 23:30:18 +00:00
parent ee3b71f6bc
commit 191b13aaca
3 changed files with 83 additions and 141 deletions

View File

@ -75,5 +75,7 @@ user_write_unlock_oid(Oid oid)
int int
user_unlock_all(void) user_unlock_all(void)
{ {
return LockReleaseAll(USER_LOCKMETHOD, true); LockReleaseAll(USER_LOCKMETHOD, true);
return true;
} }

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.151 2005/05/11 01:26:02 neilc Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.152 2005/05/19 23:30:18 tgl Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
@ -166,6 +166,8 @@ static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
int *myHolding); int *myHolding);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable); PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
PROCLOCK *proclock, bool wakeupNeeded);
/* /*
@ -589,13 +591,12 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* anyone to release the lock object later. * anyone to release the lock object later.
*/ */
Assert(SHMQueueEmpty(&(lock->procLocks))); Assert(SHMQueueEmpty(&(lock->procLocks)));
lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], if (!hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag), (void *) &(lock->tag),
HASH_REMOVE, NULL); HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
} }
LWLockRelease(masterLock); LWLockRelease(masterLock);
if (!lock) /* hash remove failed? */
elog(WARNING, "lock table corrupted");
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"), errmsg("out of shared memory"),
@ -708,11 +709,10 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
{ {
SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink); SHMQueueDelete(&proclock->procLink);
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], if (!hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag), (void *) &(proclock->tag),
HASH_REMOVE, NULL); HASH_REMOVE, NULL))
if (!proclock) elog(PANIC, "proclock table corrupted");
elog(WARNING, "proclock table corrupted");
} }
else else
PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
@ -784,10 +784,9 @@ RemoveLocalLock(LOCALLOCK *locallock)
pfree(locallock->lockOwners); pfree(locallock->lockOwners);
locallock->lockOwners = NULL; locallock->lockOwners = NULL;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], if (!hash_search(LockMethodLocalHash[lockmethodid],
(void *) &(locallock->tag), (void *) &(locallock->tag),
HASH_REMOVE, NULL); HASH_REMOVE, NULL))
if (!locallock)
elog(WARNING, "locallock table corrupted"); elog(WARNING, "locallock table corrupted");
} }
@ -993,6 +992,55 @@ UnGrantLock(LOCK *lock, LOCKMODE lockmode,
return wakeupNeeded; return wakeupNeeded;
} }
/*
* CleanUpLock -- clean up after releasing a lock. We garbage-collect the
* proclock and lock objects if possible, and call ProcLockWakeup if there
* are remaining requests and the caller says it's OK. (Normally, this
* should be called after UnGrantLock, and wakeupNeeded is the result from
* UnGrantLock.)
*
* The locktable's masterLock must be held at entry, and will be
* held at exit.
*/
static void
CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock, PROCLOCK *proclock,
bool wakeupNeeded)
{
/*
* If this was my last hold on this lock, delete my entry in the
* proclock table.
*/
if (proclock->holdMask == 0)
{
PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
if (!hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "proclock table corrupted");
}
if (lock->nRequested == 0)
{
/*
* The caller just released the last lock, so garbage-collect the
* lock object.
*/
LOCK_PRINT("CleanUpLock: deleting", lock, 0);
Assert(SHMQueueEmpty(&(lock->procLocks)));
if (!hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag),
HASH_REMOVE, NULL))
elog(PANIC, "lock table corrupted");
}
else if (wakeupNeeded)
{
/* There are waiters on this lock, so wake them up. */
ProcLockWakeup(LockMethods[lockmethodid], lock);
}
}
/* /*
* GrantLockLocal -- update the locallock data structures to show * GrantLockLocal -- update the locallock data structures to show
* the lock request has been granted. * the lock request has been granted.
@ -1160,30 +1208,12 @@ RemoveFromWaitQueue(PGPROC *proc)
/* /*
* Delete the proclock immediately if it represents no already-held locks. * Delete the proclock immediately if it represents no already-held locks.
* This must happen now because if the owner of the lock decides to release * (This must happen now because if the owner of the lock decides to
* it, and the requested/granted counts then go to zero, LockRelease * release it, and the requested/granted counts then go to zero,
* expects there to be no remaining proclocks. * LockRelease expects there to be no remaining proclocks.)
* Then see if any other waiters for the lock can be woken up now.
*/ */
if (proclock->holdMask == 0) CleanUpLock(lockmethodid, waitLock, proclock, true);
{
PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag),
HASH_REMOVE, NULL);
if (!proclock)
elog(WARNING, "proclock table corrupted");
}
/*
* There should still be some requests for the lock ... else what were
* we waiting for? Therefore no need to delete the lock object.
*/
Assert(waitLock->nRequested > 0);
/* See if any other waiters for the lock can be woken up now */
ProcLockWakeup(LockMethods[lockmethodid], waitLock);
} }
/* /*
@ -1221,10 +1251,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert(lockmethodid < NumLockMethods); Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid]; lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable) if (!lockMethodTable)
{ elog(ERROR, "bad lock method: %d", lockmethodid);
elog(WARNING, "lockMethodTable is null in LockRelease");
return FALSE;
}
/* /*
* Find the LOCALLOCK entry for this lock and lockmode * Find the LOCALLOCK entry for this lock and lockmode
@ -1328,56 +1355,12 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
return FALSE; return FALSE;
} }
/*
* Do the releasing. CleanUpLock will waken any now-wakable waiters.
*/
wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
/* CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
* If this was my last hold on this lock, delete my entry in the
* proclock table.
*/
if (proclock->holdMask == 0)
{
PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag),
HASH_REMOVE, NULL);
if (!proclock)
{
LWLockRelease(masterLock);
elog(WARNING, "proclock table corrupted");
RemoveLocalLock(locallock);
return FALSE;
}
}
if (lock->nRequested == 0)
{
/*
* We've just released the last lock, so garbage-collect the lock
* object.
*/
LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
Assert(SHMQueueEmpty(&(lock->procLocks)));
lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag),
HASH_REMOVE, NULL);
if (!lock)
{
LWLockRelease(masterLock);
elog(WARNING, "lock table corrupted");
RemoveLocalLock(locallock);
return FALSE;
}
}
else
{
/*
* Wake up waiters if needed.
*/
if (wakeupNeeded)
ProcLockWakeup(lockMethodTable, lock);
}
LWLockRelease(masterLock); LWLockRelease(masterLock);
@ -1397,7 +1380,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* allxids == false: release all locks with Xid != 0 * allxids == false: release all locks with Xid != 0
* (zero is the Xid used for "session" locks). * (zero is the Xid used for "session" locks).
*/ */
bool void
LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
@ -1418,10 +1401,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert(lockmethodid < NumLockMethods); Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid]; lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable) if (!lockMethodTable)
{ elog(ERROR, "bad lock method: %d", lockmethodid);
elog(WARNING, "bad lock method: %d", lockmethodid);
return FALSE;
}
numLockModes = lockMethodTable->numLockModes; numLockModes = lockMethodTable->numLockModes;
masterLock = lockMethodTable->masterLock; masterLock = lockMethodTable->masterLock;
@ -1516,48 +1496,10 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert(lock->nGranted <= lock->nRequested); Assert(lock->nGranted <= lock->nRequested);
LOCK_PRINT("LockReleaseAll: updated", lock, 0); LOCK_PRINT("LockReleaseAll: updated", lock, 0);
PROCLOCK_PRINT("LockReleaseAll: deleting", proclock); Assert(proclock->holdMask == 0);
/* /* CleanUpLock will wake up waiters if needed. */
* Remove the proclock entry from the linked lists CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
*/
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
/*
* remove the proclock entry from the hashtable
*/
proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
(void *) &(proclock->tag),
HASH_REMOVE,
NULL);
if (!proclock)
{
LWLockRelease(masterLock);
elog(WARNING, "proclock table corrupted");
return FALSE;
}
if (lock->nRequested == 0)
{
/*
* We've just released the last lock, so garbage-collect the
* lock object.
*/
LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
Assert(SHMQueueEmpty(&(lock->procLocks)));
lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag),
HASH_REMOVE, NULL);
if (!lock)
{
LWLockRelease(masterLock);
elog(WARNING, "lock table corrupted");
return FALSE;
}
}
else if (wakeupNeeded)
ProcLockWakeup(lockMethodTable, lock);
next_item: next_item:
proclock = nextHolder; proclock = nextHolder;
@ -1569,8 +1511,6 @@ next_item:
if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(LOG, "LockReleaseAll done"); elog(LOG, "LockReleaseAll done");
#endif #endif
return TRUE;
} }
/* /*

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.85 2005/04/29 22:28:24 tgl Exp $ * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.86 2005/05/19 23:30:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -361,7 +361,7 @@ extern bool LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait); TransactionId xid, LOCKMODE lockmode, bool dontWait);
extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode); TransactionId xid, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids); extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
extern void LockReleaseCurrentOwner(void); extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void); extern void LockReassignCurrentOwner(void);
extern int LockCheckConflicts(LockMethod lockMethodTable, extern int LockCheckConflicts(LockMethod lockMethodTable,