diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e42e5745f..e35ac9910c 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -28,6 +28,8 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -37,7 +39,7 @@ int vacuum_defer_cleanup_age; int max_standby_archive_delay = 30 * 1000; int max_standby_streaming_delay = 30 * 1000; -static List *RecoveryLockList; +static HTAB *RecoveryLockLists; static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, ProcSignalReason reason); @@ -45,6 +47,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason); static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks); +/* + * Keep track of all the locks owned by a given transaction. + */ +typedef struct RecoveryLockListsEntry +{ + TransactionId xid; + List *locks; +} RecoveryLockListsEntry; /* * InitRecoveryTransactionEnvironment @@ -62,6 +72,19 @@ void InitRecoveryTransactionEnvironment(void) { VirtualTransactionId vxid; + HASHCTL hash_ctl; + + /* + * Initialize the hash table for tracking the list of locks held by each + * transaction. + */ + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(RecoveryLockListsEntry); + RecoveryLockLists = hash_create("RecoveryLockLists", + 64, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); /* * Initialize shared invalidation management for Startup process, being @@ -106,6 +129,10 @@ ShutdownRecoveryTransactionEnvironment(void) /* Release all locks the tracked transactions were holding */ StandbyReleaseAllLocks(); + /* Destroy the hash table of locks. */ + hash_destroy(RecoveryLockLists); + RecoveryLockLists = NULL; + /* Cleanup our VirtualTransaction */ VirtualXactLockTableCleanup(); } @@ -585,8 +612,8 @@ StandbyLockTimeoutHandler(void) * We only keep track of AccessExclusiveLocks, which are only ever held by * one transaction on one relation. * - * We keep a single dynamically expandible list of locks in local memory, - * RecoveryLockList, so we can keep track of the various entries made by + * We keep a hash table of lists of locks in local memory keyed by xid, + * RecoveryLockLists, so we can keep track of the various entries made by * the Startup process's virtual xid in the shared lock table. * * We record the lock against the top-level xid, rather than individual @@ -604,8 +631,10 @@ StandbyLockTimeoutHandler(void) void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) { + RecoveryLockListsEntry *entry; xl_standby_lock *newlock; LOCKTAG locktag; + bool found; /* Already processed? */ if (!TransactionIdIsValid(xid) || @@ -619,58 +648,68 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) /* dbOid is InvalidOid when we are locking a shared relation. */ Assert(OidIsValid(relOid)); + /* Create a new list for this xid, if we don't have one already. */ + entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found); + if (!found) + { + entry->xid = xid; + entry->locks = NIL; + } + newlock = palloc(sizeof(xl_standby_lock)); newlock->xid = xid; newlock->dbOid = dbOid; newlock->relOid = relOid; - RecoveryLockList = lappend(RecoveryLockList, newlock); + entry->locks = lappend(entry->locks, newlock); SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid); LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false); } +static void +StandbyReleaseLockList(List *locks) +{ + while (locks) + { + xl_standby_lock *lock = (xl_standby_lock *) linitial(locks); + LOCKTAG locktag; + elog(trace_recovery(DEBUG4), + "releasing recovery lock: xid %u db %u rel %u", + lock->xid, lock->dbOid, lock->relOid); + SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); + if (!LockRelease(&locktag, AccessExclusiveLock, true)) + { + elog(LOG, + "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + lock->xid, lock->dbOid, lock->relOid); + Assert(false); + } + pfree(lock); + locks = list_delete_first(locks); + } +} + static void StandbyReleaseLocks(TransactionId xid) { - ListCell *cell, - *prev, - *next; + RecoveryLockListsEntry *entry; - /* - * Release all matching locks and remove them from list - */ - prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) + if (TransactionIdIsValid(xid)) { - xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); - - next = lnext(cell); - - if (!TransactionIdIsValid(xid) || lock->xid == xid) + if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL))) { - LOCKTAG locktag; - - elog(trace_recovery(DEBUG4), - "releasing recovery lock: xid %u db %u rel %u", - lock->xid, lock->dbOid, lock->relOid); - SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); - if (!LockRelease(&locktag, AccessExclusiveLock, true)) - elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", - lock->xid, lock->dbOid, lock->relOid); - - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); - pfree(lock); + StandbyReleaseLockList(entry->locks); + hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL); } - else - prev = cell; } + else + StandbyReleaseAllLocks(); } /* * Release locks for a transaction tree, starting at xid down, from - * RecoveryLockList. + * RecoveryLockLists. * * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode, * to remove any AccessExclusiveLocks requested by a transaction. @@ -692,30 +731,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids) void StandbyReleaseAllLocks(void) { - ListCell *cell, - *prev, - *next; - LOCKTAG locktag; + HASH_SEQ_STATUS status; + RecoveryLockListsEntry *entry; elog(trace_recovery(DEBUG2), "release all standby locks"); - prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) + hash_seq_init(&status, RecoveryLockLists); + while ((entry = hash_seq_search(&status))) { - xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); - - next = lnext(cell); - - elog(trace_recovery(DEBUG4), - "releasing recovery lock: xid %u db %u rel %u", - lock->xid, lock->dbOid, lock->relOid); - SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); - if (!LockRelease(&locktag, AccessExclusiveLock, true)) - elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", - lock->xid, lock->dbOid, lock->relOid); - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); - pfree(lock); + StandbyReleaseLockList(entry->locks); + hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL); } } @@ -727,22 +752,17 @@ StandbyReleaseAllLocks(void) void StandbyReleaseOldLocks(int nxids, TransactionId *xids) { - ListCell *cell, - *prev, - *next; - LOCKTAG locktag; + HASH_SEQ_STATUS status; + RecoveryLockListsEntry *entry; - prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) + hash_seq_init(&status, RecoveryLockLists); + while ((entry = hash_seq_search(&status))) { - xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); bool remove = false; - next = lnext(cell); + Assert(TransactionIdIsValid(entry->xid)); - Assert(TransactionIdIsValid(lock->xid)); - - if (StandbyTransactionIdIsPrepared(lock->xid)) + if (StandbyTransactionIdIsPrepared(entry->xid)) remove = false; else { @@ -751,7 +771,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids) for (i = 0; i < nxids; i++) { - if (lock->xid == xids[i]) + if (entry->xid == xids[i]) { found = true; break; @@ -767,19 +787,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids) if (remove) { - elog(trace_recovery(DEBUG4), - "releasing recovery lock: xid %u db %u rel %u", - lock->xid, lock->dbOid, lock->relOid); - SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); - if (!LockRelease(&locktag, AccessExclusiveLock, true)) - elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", - lock->xid, lock->dbOid, lock->relOid); - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); - pfree(lock); + StandbyReleaseLockList(entry->locks); + hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL); } - else - prev = cell; } }