diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 072b27610f..fde20425f6 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -2098,19 +2098,39 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) { LWLockId partitionLock = FirstLockMgrLock + partition; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - if (!proclock) + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is trickier than + * it looks, because another backend could be in process of adding + * something to our proclock list due to promoting one of our + * fast-path locks. However, any such lock must be one that we + * decided not to delete above, so it's okay to skip it again now; + * we'd just decide not to delete it again. We must, however, be + * careful to re-fetch the list header once we've acquired the + * partition lock, to be sure we have a valid, up-to-date pointer. + * (There is probably no significant risk if pointer fetch/store is + * atomic, but we don't wish to assume that.) + * + * XXX This argument assumes that the locallock table correctly + * represents all of our fast-path locks. While allLocks mode + * guarantees to clean up all of our normal locks regardless of the + * locallock situation, we lose that guarantee for fast-path locks. + * This is not ideal. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - while (proclock) + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) { bool wakeupNeeded = false; - PROCLOCK *nextplock; /* Get link first, since we may unlink/delete this proclock */ nextplock = (PROCLOCK *) @@ -2123,7 +2143,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) /* Ignore items that are not of the lockmethod to be removed */ if (LOCK_LOCKMETHOD(*lock) != lockmethodid) - goto next_item; + continue; /* * In allLocks mode, force release of all locks even if locallock @@ -2139,7 +2159,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) * holdMask == 0 and are therefore recyclable */ if (proclock->releaseMask == 0 && proclock->holdMask != 0) - goto next_item; + continue; PROCLOCK_PRINT("LockReleaseAll", proclock); LOCK_PRINT("LockReleaseAll", lock, 0); @@ -2168,9 +2188,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) lockMethodTable, LockTagHashCode(&lock->tag), wakeupNeeded); - - next_item: - proclock = nextplock; } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); @@ -3142,19 +3159,27 @@ PostPrepare_Locks(TransactionId xid) { LWLockId partitionLock = FirstLockMgrLock + partition; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - if (!proclock) + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is safer than the + * situation in LockReleaseAll, because we got rid of any fast-path + * locks during AtPrepare_Locks, so there cannot be any case where + * another backend is adding something to our lists now. For safety, + * though, we code this the same way as in LockReleaseAll. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - while (proclock) + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) { - PROCLOCK *nextplock; - /* Get link first, since we may unlink/relink this proclock */ nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, @@ -3166,7 +3191,7 @@ PostPrepare_Locks(TransactionId xid) /* Ignore VXID locks */ if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION) - goto next_item; + continue; PROCLOCK_PRINT("PostPrepare_Locks", proclock); LOCK_PRINT("PostPrepare_Locks", lock, 0); @@ -3177,7 +3202,7 @@ PostPrepare_Locks(TransactionId xid) /* Ignore it if nothing to release (must be a session lock) */ if (proclock->releaseMask == 0) - goto next_item; + continue; /* Else we should be releasing all locks */ if (proclock->releaseMask != proclock->holdMask) @@ -3219,9 +3244,6 @@ PostPrepare_Locks(TransactionId xid) &proclock->procLink); PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); - - next_item: - proclock = nextplock; } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); @@ -3918,7 +3940,7 @@ VirtualXactLockTableInsert(VirtualTransactionId vxid) * unblocking waiters. */ void -VirtualXactLockTableCleanup() +VirtualXactLockTableCleanup(void) { bool fastpath; LocalTransactionId lxid;