diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index df4853bad1..cd4cc98113 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -93,6 +93,9 @@ static void HeapSatisfiesHOTandKeyUpdate(Relation relation, bool *satisfies_hot, bool *satisfies_key, bool *satisfies_id, HeapTuple oldtup, HeapTuple newtup); +static bool heap_acquire_tuplock(Relation relation, ItemPointer tid, + LockTupleMode mode, LockWaitPolicy wait_policy, + bool *have_tuple_lock); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -105,6 +108,8 @@ static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask, uint16 *new_infomask2); static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask); +static bool DoesMultiXactIdConflict(MultiXactId multi, uint16 infomask, + LockTupleMode lockmode); static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, uint16 infomask, Relation rel, ItemPointer ctid, XLTW_Oper oper, int *remaining); @@ -2700,11 +2705,8 @@ l1: * this arranges that we stay at the head of the line while rechecking * tuple state. */ - if (!have_tuple_lock) - { - LockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive); - have_tuple_lock = true; - } + heap_acquire_tuplock(relation, &(tp.t_self), LockTupleExclusive, + LockWaitBlock, &have_tuple_lock); /* * Sleep until concurrent transaction ends. Note that we don't care @@ -3223,21 +3225,6 @@ l2: LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - /* - * Acquire tuple lock to establish our priority for the tuple (see - * heap_lock_tuple). LockTuple will release us when we are - * next-in-line for the tuple. - * - * If we are forced to "start over" below, we keep the tuple lock; - * this arranges that we stay at the head of the line while rechecking - * tuple state. - */ - if (!have_tuple_lock) - { - LockTupleTuplock(relation, &(oldtup.t_self), *lockmode); - have_tuple_lock = true; - } - /* * Now we have to do something about the existing locker. If it's a * multi, sleep on it; we might be awakened before it is completely @@ -3248,12 +3235,30 @@ l2: * before actually going to sleep. If the update doesn't conflict * with the locks, we just continue without sleeping (but making sure * it is preserved). + * + * Before sleeping, we need to acquire tuple lock to establish our + * priority for the tuple (see heap_lock_tuple). LockTuple will + * release us when we are next-in-line for the tuple. Note we must not + * acquire the tuple lock until we're sure we're going to sleep; + * otherwise we're open for race conditions with other transactions + * holding the tuple lock which sleep on us. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while rechecking + * tuple state. */ if (infomask & HEAP_XMAX_IS_MULTI) { TransactionId update_xact; int remain; + /* acquire tuple lock, if necessary */ + if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask, *lockmode)) + { + heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode, + LockWaitBlock, &have_tuple_lock); + } + /* wait for multixact */ MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask, relation, &oldtup.t_data->t_ctid, XLTW_Update, @@ -3330,7 +3335,12 @@ l2: } else { - /* wait for regular transaction to end */ + /* + * Wait for regular transaction to end; but first, acquire + * tuple lock. + */ + heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode, + LockWaitBlock, &have_tuple_lock); XactLockTableWait(xwait, relation, &oldtup.t_data->t_ctid, XLTW_Update); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -4038,7 +4048,6 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) return (MultiXactStatus) retval; } - /* * heap_lock_tuple - lock a tuple in shared or exclusive mode * @@ -4170,42 +4179,6 @@ l3: pfree(members); } - /* - * Acquire tuple lock to establish our priority for the tuple. - * LockTuple will release us when we are next-in-line for the tuple. - * We must do this even if we are share-locking. - * - * If we are forced to "start over" below, we keep the tuple lock; - * this arranges that we stay at the head of the line while rechecking - * tuple state. - */ - if (!have_tuple_lock) - { - switch (wait_policy) - { - case LockWaitBlock: - LockTupleTuplock(relation, tid, mode); - break; - case LockWaitSkip: - if (!ConditionalLockTupleTuplock(relation, tid, mode)) - { - result = HeapTupleWouldBlock; - /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - goto failed; - } - break; - case LockWaitError: - if (!ConditionalLockTupleTuplock(relation, tid, mode)) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on row in relation \"%s\"", - RelationGetRelationName(relation)))); - break; - } - have_tuple_lock = true; - } - /* * Initially assume that we will have to wait for the locking * transaction(s) to finish. We check various cases below in which @@ -4312,67 +4285,27 @@ l3: { /* * If we're requesting NoKeyExclusive, we might also be able to - * avoid sleeping; just ensure that there's no other lock type - * than KeyShare. Note that this is a bit more involved than just - * checking hint bits -- we need to expand the multixact to figure - * out lock modes for each one (unless there was only one such - * locker). + * avoid sleeping; just ensure that there no conflicting lock + * already acquired. */ if (infomask & HEAP_XMAX_IS_MULTI) { - int nmembers; - MultiXactMember *members; - - /* - * We don't need to allow old multixacts here; if that had - * been the case, HeapTupleSatisfiesUpdate would have returned - * MayBeUpdated and we wouldn't be here. - */ - nmembers = - GetMultiXactIdMembers(xwait, &members, false, - HEAP_XMAX_IS_LOCKED_ONLY(infomask)); - - if (nmembers <= 0) + if (!DoesMultiXactIdConflict((MultiXactId) xwait, infomask, + mode)) { /* - * No need to keep the previous xmax here. This is - * unlikely to happen. + * No conflict, but if the xmax changed under us in the + * meantime, start over. */ + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || + !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), + xwait)) + goto l3; + + /* otherwise, we're good */ require_sleep = false; } - else - { - int i; - bool allowed = true; - - for (i = 0; i < nmembers; i++) - { - if (members[i].status != MultiXactStatusForKeyShare) - { - allowed = false; - break; - } - } - if (allowed) - { - /* - * if the xmax changed under us in the meantime, start - * over. - */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || - !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), - xwait)) - { - pfree(members); - goto l3; - } - /* otherwise, we're good */ - require_sleep = false; - } - - pfree(members); - } } else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask)) { @@ -4397,6 +4330,28 @@ l3: if (require_sleep) { + /* + * Acquire tuple lock to establish our priority for the tuple. + * LockTuple will release us when we are next-in-line for the tuple. + * We must do this even if we are share-locking. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while rechecking + * tuple state. + */ + if (!heap_acquire_tuplock(relation, tid, mode, wait_policy, + &have_tuple_lock)) + { + /* + * This can only happen if wait_policy is Skip and the lock + * couldn't be obtained. + */ + result = HeapTupleWouldBlock; + /* recovery code expects to have buffer lock held */ + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + goto failed; + } + if (infomask & HEAP_XMAX_IS_MULTI) { MultiXactStatus status = get_mxact_status_for_lock(mode, false); @@ -4713,6 +4668,48 @@ failed: return HeapTupleMayBeUpdated; } +/* + * Acquire heavyweight lock on the given tuple, in preparation for acquiring + * its normal, Xmax-based tuple lock. + * + * have_tuple_lock is an input and output parameter: on input, it indicates + * whether the lock has previously been acquired (and this function does + * nothing in that case). If this function returns success, have_tuple_lock + * has been flipped to true. + * + * Returns false if it was unable to obtain the lock; this can only happen if + * wait_policy is Skip. + */ +static bool +heap_acquire_tuplock(Relation relation, ItemPointer tid, LockTupleMode mode, + LockWaitPolicy wait_policy, bool *have_tuple_lock) +{ + if (*have_tuple_lock) + return true; + + switch (wait_policy) + { + case LockWaitBlock: + LockTupleTuplock(relation, tid, mode); + break; + + case LockWaitSkip: + if (!ConditionalLockTupleTuplock(relation, tid, mode)) + return false; + break; + + case LockWaitError: + if (!ConditionalLockTupleTuplock(relation, tid, mode)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(relation)))); + break; + } + *have_tuple_lock = true; + + return true; +} /* * Given an original set of Xmax and infomask, and a transaction (identified by @@ -6091,6 +6088,70 @@ HeapTupleGetUpdateXid(HeapTupleHeader tuple) tuple->t_infomask); } +/* + * Does the given multixact conflict with the current transaction grabbing a + * tuple lock of the given strength? + * + * The passed infomask pairs up with the given multixact in the tuple header. + */ +static bool +DoesMultiXactIdConflict(MultiXactId multi, uint16 infomask, + LockTupleMode lockmode) +{ + bool allow_old; + int nmembers; + MultiXactMember *members; + bool result = false; + + allow_old = !(infomask & HEAP_LOCK_MASK) && HEAP_XMAX_IS_LOCKED_ONLY(infomask); + nmembers = GetMultiXactIdMembers(multi, &members, allow_old, + HEAP_XMAX_IS_LOCKED_ONLY(infomask)); + if (nmembers >= 0) + { + int i; + + for (i = 0; i < nmembers; i++) + { + TransactionId memxid; + LockTupleMode memlockmode; + + memlockmode = LOCKMODE_from_mxstatus(members[i].status); + /* ignore members that don't conflict with the lock we want */ + if (!DoLockModesConflict(memlockmode, lockmode)) + continue; + + /* ignore members from current xact */ + memxid = members[i].xid; + if (TransactionIdIsCurrentTransactionId(memxid)) + continue; + + if (ISUPDATE_from_mxstatus(members[i].status)) + { + /* ignore aborted updaters */ + if (TransactionIdDidAbort(memxid)) + continue; + } + else + { + /* ignore lockers-only that are no longer in progress */ + if (!TransactionIdIsInProgress(memxid)) + continue; + } + + /* + * Whatever remains are either live lockers that conflict with our + * wanted lock, and updaters that are not aborted. Those conflict + * with what we want, so return true. + */ + result = true; + break; + } + pfree(members); + } + + return result; +} + /* * Do_MultiXactIdWait * Actual implementation for the two functions below.