Allow locking updated tuples in tuple_update() and tuple_delete()
Currently, in read committed transaction isolation mode (default), we have the following sequence of actions when tuple_update()/tuple_delete() finds the tuple updated by concurrent transaction. 1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which returns TM_Updated. 2. Lock tuple with tuple_lock(). 3. Re-evaluate plan qual (recheck if we still need to update/delete and calculate the new tuple for update). 4. Second attempt to update/delete tuple with tuple_update()/tuple_delete(). This attempt should be successful, since the tuple was previously locked. This patch eliminates step 2 by taking the lock during first tuple_update()/tuple_delete() call. Heap table access method saves some efforts by checking the updated tuple once instead of twice. Future undo-based table access methods, which will start from the latest row version, can immediately place a lock there. The code in nodeModifyTable.c is simplified by removing the nested switch/case. Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp Reviewed-by: Andres Freund, Chris Travers
This commit is contained in:
parent
764da7710b
commit
11470f544e
|
@ -45,6 +45,12 @@
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
|
||||||
|
Snapshot snapshot, TupleTableSlot *slot,
|
||||||
|
CommandId cid, LockTupleMode mode,
|
||||||
|
LockWaitPolicy wait_policy, uint8 flags,
|
||||||
|
TM_FailureData *tmfd, bool updated);
|
||||||
|
|
||||||
static void reform_and_rewrite_tuple(HeapTuple tuple,
|
static void reform_and_rewrite_tuple(HeapTuple tuple,
|
||||||
Relation OldHeap, Relation NewHeap,
|
Relation OldHeap, Relation NewHeap,
|
||||||
Datum *values, bool *isnull, RewriteState rwstate);
|
Datum *values, bool *isnull, RewriteState rwstate);
|
||||||
|
@ -299,14 +305,46 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
||||||
static TM_Result
|
static TM_Result
|
||||||
heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
||||||
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
||||||
TM_FailureData *tmfd, bool changingPart)
|
TM_FailureData *tmfd, bool changingPart,
|
||||||
|
LazyTupleTableSlot *lockedSlot)
|
||||||
{
|
{
|
||||||
|
TM_Result result;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Currently Deleting of index tuples are handled at vacuum, in case if
|
* Currently Deleting of index tuples are handled at vacuum, in case if
|
||||||
* the storage itself is cleaning the dead tuples by itself, it is the
|
* the storage itself is cleaning the dead tuples by itself, it is the
|
||||||
* time to call the index tuple deletion also.
|
* time to call the index tuple deletion also.
|
||||||
*/
|
*/
|
||||||
return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
|
result = heap_delete(relation, tid, cid, crosscheck, wait,
|
||||||
|
tmfd, changingPart);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the tuple has been concurrently updated, then get the lock on it.
|
||||||
|
* (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
|
||||||
|
* lock held retry of delete should succeed even if there are more
|
||||||
|
* concurrent update attempts.
|
||||||
|
*/
|
||||||
|
if (result == TM_Updated && lockedSlot)
|
||||||
|
{
|
||||||
|
TupleTableSlot *evalSlot;
|
||||||
|
|
||||||
|
Assert(wait);
|
||||||
|
|
||||||
|
evalSlot = LAZY_TTS_EVAL(lockedSlot);
|
||||||
|
result = heapam_tuple_lock_internal(relation, tid, snapshot,
|
||||||
|
evalSlot, cid, LockTupleExclusive,
|
||||||
|
LockWaitBlock,
|
||||||
|
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
|
||||||
|
tmfd, true);
|
||||||
|
|
||||||
|
if (result == TM_Ok)
|
||||||
|
{
|
||||||
|
tmfd->traversed = true;
|
||||||
|
return TM_Updated;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -314,7 +352,8 @@ static TM_Result
|
||||||
heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
||||||
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
||||||
bool wait, TM_FailureData *tmfd,
|
bool wait, TM_FailureData *tmfd,
|
||||||
LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes)
|
LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes,
|
||||||
|
LazyTupleTableSlot *lockedSlot)
|
||||||
{
|
{
|
||||||
bool shouldFree = true;
|
bool shouldFree = true;
|
||||||
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
||||||
|
@ -352,6 +391,32 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
|
||||||
if (shouldFree)
|
if (shouldFree)
|
||||||
pfree(tuple);
|
pfree(tuple);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the tuple has been concurrently updated, then get the lock on it.
|
||||||
|
* (Do this if caller asked for tat by providing a 'lockedSlot'.) With the
|
||||||
|
* lock held retry of update should succeed even if there are more
|
||||||
|
* concurrent update attempts.
|
||||||
|
*/
|
||||||
|
if (result == TM_Updated && lockedSlot)
|
||||||
|
{
|
||||||
|
TupleTableSlot *evalSlot;
|
||||||
|
|
||||||
|
Assert(wait);
|
||||||
|
|
||||||
|
evalSlot = LAZY_TTS_EVAL(lockedSlot);
|
||||||
|
result = heapam_tuple_lock_internal(relation, otid, snapshot,
|
||||||
|
evalSlot, cid, *lockmode,
|
||||||
|
LockWaitBlock,
|
||||||
|
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
|
||||||
|
tmfd, true);
|
||||||
|
|
||||||
|
if (result == TM_Ok)
|
||||||
|
{
|
||||||
|
tmfd->traversed = true;
|
||||||
|
return TM_Updated;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -360,10 +425,26 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
||||||
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
||||||
LockWaitPolicy wait_policy, uint8 flags,
|
LockWaitPolicy wait_policy, uint8 flags,
|
||||||
TM_FailureData *tmfd)
|
TM_FailureData *tmfd)
|
||||||
|
{
|
||||||
|
return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid,
|
||||||
|
mode, wait_policy, flags, tmfd, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This routine does the work for heapam_tuple_lock(), but also support
|
||||||
|
* `updated` argument to re-use the work done by heapam_tuple_update() or
|
||||||
|
* heapam_tuple_delete() on figuring out that tuple was concurrently updated.
|
||||||
|
*/
|
||||||
|
static TM_Result
|
||||||
|
heapam_tuple_lock_internal(Relation relation, ItemPointer tid,
|
||||||
|
Snapshot snapshot, TupleTableSlot *slot,
|
||||||
|
CommandId cid, LockTupleMode mode,
|
||||||
|
LockWaitPolicy wait_policy, uint8 flags,
|
||||||
|
TM_FailureData *tmfd, bool updated)
|
||||||
{
|
{
|
||||||
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
|
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
|
||||||
TM_Result result;
|
TM_Result result;
|
||||||
Buffer buffer;
|
Buffer buffer = InvalidBuffer;
|
||||||
HeapTuple tuple = &bslot->base.tupdata;
|
HeapTuple tuple = &bslot->base.tupdata;
|
||||||
bool follow_updates;
|
bool follow_updates;
|
||||||
|
|
||||||
|
@ -374,16 +455,26 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
||||||
|
|
||||||
tuple_lock_retry:
|
tuple_lock_retry:
|
||||||
tuple->t_self = *tid;
|
tuple->t_self = *tid;
|
||||||
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
|
if (!updated)
|
||||||
follow_updates, &buffer, tmfd);
|
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
|
||||||
|
follow_updates, &buffer, tmfd);
|
||||||
|
else
|
||||||
|
result = TM_Updated;
|
||||||
|
|
||||||
if (result == TM_Updated &&
|
if (result == TM_Updated &&
|
||||||
(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
|
(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
|
||||||
{
|
{
|
||||||
/* Should not encounter speculative tuple on recheck */
|
if (!updated)
|
||||||
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
|
{
|
||||||
|
/* Should not encounter speculative tuple on recheck */
|
||||||
|
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
|
||||||
|
|
||||||
ReleaseBuffer(buffer);
|
ReleaseBuffer(buffer);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
updated = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
|
if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
|
||||||
{
|
{
|
||||||
|
|
|
@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
|
||||||
GetCurrentCommandId(true),
|
GetCurrentCommandId(true),
|
||||||
snapshot, InvalidSnapshot,
|
snapshot, InvalidSnapshot,
|
||||||
true /* wait for commit */ ,
|
true /* wait for commit */ ,
|
||||||
&tmfd, false /* changingPart */ );
|
&tmfd, false /* changingPart */ ,
|
||||||
|
NULL);
|
||||||
|
|
||||||
switch (result)
|
switch (result)
|
||||||
{
|
{
|
||||||
|
@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
|
||||||
GetCurrentCommandId(true),
|
GetCurrentCommandId(true),
|
||||||
snapshot, InvalidSnapshot,
|
snapshot, InvalidSnapshot,
|
||||||
true /* wait for commit */ ,
|
true /* wait for commit */ ,
|
||||||
&tmfd, &lockmode, update_indexes);
|
&tmfd, &lockmode, update_indexes,
|
||||||
|
NULL);
|
||||||
|
|
||||||
switch (result)
|
switch (result)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1324,26 +1324,62 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The implementation for LazyTupleTableSlot wrapper for EPQ slot to be passed
|
||||||
|
* to table_tuple_update()/table_tuple_delete().
|
||||||
|
*/
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
EPQState *epqstate;
|
||||||
|
ResultRelInfo *resultRelInfo;
|
||||||
|
} GetEPQSlotArg;
|
||||||
|
|
||||||
|
static TupleTableSlot *
|
||||||
|
GetEPQSlot(void *arg)
|
||||||
|
{
|
||||||
|
GetEPQSlotArg *slotArg = (GetEPQSlotArg *) arg;
|
||||||
|
|
||||||
|
return EvalPlanQualSlot(slotArg->epqstate,
|
||||||
|
slotArg->resultRelInfo->ri_RelationDesc,
|
||||||
|
slotArg->resultRelInfo->ri_RangeTableIndex);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecDeleteAct -- subroutine for ExecDelete
|
* ExecDeleteAct -- subroutine for ExecDelete
|
||||||
*
|
*
|
||||||
* Actually delete the tuple from a plain table.
|
* Actually delete the tuple from a plain table.
|
||||||
*
|
*
|
||||||
|
* If the 'lockUpdated' flag is set and the target tuple is updated, then
|
||||||
|
* the latest version gets locked and fetched into the EPQ slot.
|
||||||
|
*
|
||||||
* Caller is in charge of doing EvalPlanQual as necessary
|
* Caller is in charge of doing EvalPlanQual as necessary
|
||||||
*/
|
*/
|
||||||
static TM_Result
|
static TM_Result
|
||||||
ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
||||||
ItemPointer tupleid, bool changingPart)
|
ItemPointer tupleid, bool changingPart, bool lockUpdated)
|
||||||
{
|
{
|
||||||
EState *estate = context->estate;
|
EState *estate = context->estate;
|
||||||
|
GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
|
||||||
|
LazyTupleTableSlot lazyEPQSlot,
|
||||||
|
*lazyEPQSlotPtr;
|
||||||
|
|
||||||
|
if (lockUpdated)
|
||||||
|
{
|
||||||
|
MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
|
||||||
|
lazyEPQSlotPtr = &lazyEPQSlot;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lazyEPQSlotPtr = NULL;
|
||||||
|
}
|
||||||
return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
|
return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid,
|
||||||
estate->es_output_cid,
|
estate->es_output_cid,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot,
|
||||||
estate->es_crosscheck_snapshot,
|
estate->es_crosscheck_snapshot,
|
||||||
true /* wait for commit */ ,
|
true /* wait for commit */ ,
|
||||||
&context->tmfd,
|
&context->tmfd,
|
||||||
changingPart);
|
changingPart,
|
||||||
|
lazyEPQSlotPtr);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1488,7 +1524,8 @@ ExecDelete(ModifyTableContext *context,
|
||||||
* transaction-snapshot mode transactions.
|
* transaction-snapshot mode transactions.
|
||||||
*/
|
*/
|
||||||
ldelete:
|
ldelete:
|
||||||
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart);
|
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart,
|
||||||
|
!IsolationUsesXactSnapshot());
|
||||||
|
|
||||||
switch (result)
|
switch (result)
|
||||||
{
|
{
|
||||||
|
@ -1541,103 +1578,49 @@ ldelete:
|
||||||
errmsg("could not serialize access due to concurrent update")));
|
errmsg("could not serialize access due to concurrent update")));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Already know that we're going to need to do EPQ, so
|
* ExecDeleteAct() has already locked the old tuple for
|
||||||
* fetch tuple directly into the right slot.
|
* us. Now we need to copy it to the right slot.
|
||||||
*/
|
*/
|
||||||
EvalPlanQualBegin(context->epqstate);
|
EvalPlanQualBegin(context->epqstate);
|
||||||
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
|
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
|
||||||
resultRelInfo->ri_RangeTableIndex);
|
resultRelInfo->ri_RangeTableIndex);
|
||||||
|
|
||||||
result = table_tuple_lock(resultRelationDesc, tupleid,
|
/*
|
||||||
estate->es_snapshot,
|
* Save locked table for further processing for RETURNING
|
||||||
inputslot, estate->es_output_cid,
|
* clause.
|
||||||
LockTupleExclusive, LockWaitBlock,
|
*/
|
||||||
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
|
if (processReturning &&
|
||||||
&context->tmfd);
|
resultRelInfo->ri_projectReturning &&
|
||||||
|
!resultRelInfo->ri_FdwRoutine)
|
||||||
switch (result)
|
|
||||||
{
|
{
|
||||||
case TM_Ok:
|
TupleTableSlot *returningSlot;
|
||||||
Assert(context->tmfd.traversed);
|
|
||||||
|
|
||||||
/*
|
returningSlot = ExecGetReturningSlot(estate,
|
||||||
* Save locked tuple for further processing of
|
resultRelInfo);
|
||||||
* RETURNING clause.
|
ExecCopySlot(returningSlot, inputslot);
|
||||||
*/
|
ExecMaterializeSlot(returningSlot);
|
||||||
if (processReturning &&
|
|
||||||
resultRelInfo->ri_projectReturning &&
|
|
||||||
!resultRelInfo->ri_FdwRoutine)
|
|
||||||
{
|
|
||||||
TupleTableSlot *returningSlot;
|
|
||||||
|
|
||||||
returningSlot = ExecGetReturningSlot(estate, resultRelInfo);
|
|
||||||
ExecCopySlot(returningSlot, inputslot);
|
|
||||||
ExecMaterializeSlot(returningSlot);
|
|
||||||
}
|
|
||||||
|
|
||||||
epqslot = EvalPlanQual(context->epqstate,
|
|
||||||
resultRelationDesc,
|
|
||||||
resultRelInfo->ri_RangeTableIndex,
|
|
||||||
inputslot);
|
|
||||||
if (TupIsNull(epqslot))
|
|
||||||
/* Tuple not passing quals anymore, exiting... */
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If requested, skip delete and pass back the
|
|
||||||
* updated row.
|
|
||||||
*/
|
|
||||||
if (epqreturnslot)
|
|
||||||
{
|
|
||||||
*epqreturnslot = epqslot;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
goto ldelete;
|
|
||||||
|
|
||||||
case TM_SelfModified:
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This can be reached when following an update
|
|
||||||
* chain from a tuple updated by another session,
|
|
||||||
* reaching a tuple that was already updated in
|
|
||||||
* this transaction. If previously updated by this
|
|
||||||
* command, ignore the delete, otherwise error
|
|
||||||
* out.
|
|
||||||
*
|
|
||||||
* See also TM_SelfModified response to
|
|
||||||
* table_tuple_delete() above.
|
|
||||||
*/
|
|
||||||
if (context->tmfd.cmax != estate->es_output_cid)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
|
|
||||||
errmsg("tuple to be deleted was already modified by an operation triggered by the current command"),
|
|
||||||
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
case TM_Deleted:
|
|
||||||
/* tuple already deleted; nothing to do */
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
default:
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TM_Invisible should be impossible because we're
|
|
||||||
* waiting for updated row versions, and would
|
|
||||||
* already have errored out if the first version
|
|
||||||
* is invisible.
|
|
||||||
*
|
|
||||||
* TM_Updated should be impossible, because we're
|
|
||||||
* locking the latest version via
|
|
||||||
* TUPLE_LOCK_FLAG_FIND_LAST_VERSION.
|
|
||||||
*/
|
|
||||||
elog(ERROR, "unexpected table_tuple_lock status: %u",
|
|
||||||
result);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert(false);
|
Assert(context->tmfd.traversed);
|
||||||
break;
|
epqslot = EvalPlanQual(context->epqstate,
|
||||||
|
resultRelationDesc,
|
||||||
|
resultRelInfo->ri_RangeTableIndex,
|
||||||
|
inputslot);
|
||||||
|
if (TupIsNull(epqslot))
|
||||||
|
/* Tuple not passing quals anymore, exiting... */
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If requested, skip delete and pass back the updated
|
||||||
|
* row.
|
||||||
|
*/
|
||||||
|
if (epqreturnslot)
|
||||||
|
{
|
||||||
|
*epqreturnslot = epqslot;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
goto ldelete;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TM_Deleted:
|
case TM_Deleted:
|
||||||
|
@ -1982,12 +1965,15 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo,
|
||||||
static TM_Result
|
static TM_Result
|
||||||
ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
||||||
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
|
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
|
||||||
bool canSetTag, UpdateContext *updateCxt)
|
bool canSetTag, bool lockUpdated, UpdateContext *updateCxt)
|
||||||
{
|
{
|
||||||
EState *estate = context->estate;
|
EState *estate = context->estate;
|
||||||
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
|
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
|
||||||
bool partition_constraint_failed;
|
bool partition_constraint_failed;
|
||||||
TM_Result result;
|
TM_Result result;
|
||||||
|
GetEPQSlotArg slotArg = {context->epqstate, resultRelInfo};
|
||||||
|
LazyTupleTableSlot lazyEPQSlot,
|
||||||
|
*lazyEPQSlotPtr;
|
||||||
|
|
||||||
updateCxt->crossPartUpdate = false;
|
updateCxt->crossPartUpdate = false;
|
||||||
|
|
||||||
|
@ -2113,13 +2099,23 @@ lreplace:
|
||||||
* for referential integrity updates in transaction-snapshot mode
|
* for referential integrity updates in transaction-snapshot mode
|
||||||
* transactions.
|
* transactions.
|
||||||
*/
|
*/
|
||||||
|
if (lockUpdated)
|
||||||
|
{
|
||||||
|
MAKE_LAZY_TTS(&lazyEPQSlot, GetEPQSlot, &slotArg);
|
||||||
|
lazyEPQSlotPtr = &lazyEPQSlot;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lazyEPQSlotPtr = NULL;
|
||||||
|
}
|
||||||
result = table_tuple_update(resultRelationDesc, tupleid, slot,
|
result = table_tuple_update(resultRelationDesc, tupleid, slot,
|
||||||
estate->es_output_cid,
|
estate->es_output_cid,
|
||||||
estate->es_snapshot,
|
estate->es_snapshot,
|
||||||
estate->es_crosscheck_snapshot,
|
estate->es_crosscheck_snapshot,
|
||||||
true /* wait for commit */ ,
|
true /* wait for commit */ ,
|
||||||
&context->tmfd, &updateCxt->lockmode,
|
&context->tmfd, &updateCxt->lockmode,
|
||||||
&updateCxt->updateIndexes);
|
&updateCxt->updateIndexes,
|
||||||
|
lazyEPQSlotPtr);
|
||||||
if (result == TM_Ok)
|
if (result == TM_Ok)
|
||||||
updateCxt->updated = true;
|
updateCxt->updated = true;
|
||||||
|
|
||||||
|
@ -2273,7 +2269,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
|
||||||
static TupleTableSlot *
|
static TupleTableSlot *
|
||||||
ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
||||||
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
|
ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
|
||||||
bool canSetTag)
|
bool canSetTag, bool locked)
|
||||||
{
|
{
|
||||||
EState *estate = context->estate;
|
EState *estate = context->estate;
|
||||||
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
|
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;
|
||||||
|
@ -2335,7 +2331,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
|
||||||
*/
|
*/
|
||||||
redo_act:
|
redo_act:
|
||||||
result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
|
result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
|
||||||
canSetTag, &updateCxt);
|
canSetTag, !IsolationUsesXactSnapshot(),
|
||||||
|
&updateCxt);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If ExecUpdateAct reports that a cross-partition update was done,
|
* If ExecUpdateAct reports that a cross-partition update was done,
|
||||||
|
@ -2394,81 +2391,39 @@ redo_act:
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||||
errmsg("could not serialize access due to concurrent update")));
|
errmsg("could not serialize access due to concurrent update")));
|
||||||
|
Assert(!locked);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Already know that we're going to need to do EPQ, so
|
* ExecUpdateAct() has already locked the old tuple for
|
||||||
* fetch tuple directly into the right slot.
|
* us. Now we need to copy it to the right slot.
|
||||||
*/
|
*/
|
||||||
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
|
inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc,
|
||||||
resultRelInfo->ri_RangeTableIndex);
|
resultRelInfo->ri_RangeTableIndex);
|
||||||
|
|
||||||
result = table_tuple_lock(resultRelationDesc, tupleid,
|
/* Make sure ri_oldTupleSlot is initialized. */
|
||||||
estate->es_snapshot,
|
if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
|
||||||
inputslot, estate->es_output_cid,
|
ExecInitUpdateProjection(context->mtstate,
|
||||||
updateCxt.lockmode, LockWaitBlock,
|
resultRelInfo);
|
||||||
TUPLE_LOCK_FLAG_FIND_LAST_VERSION,
|
|
||||||
&context->tmfd);
|
|
||||||
|
|
||||||
switch (result)
|
/*
|
||||||
{
|
* Save the locked tuple for further calculation of the
|
||||||
case TM_Ok:
|
* new tuple.
|
||||||
Assert(context->tmfd.traversed);
|
*/
|
||||||
|
oldSlot = resultRelInfo->ri_oldTupleSlot;
|
||||||
|
ExecCopySlot(oldSlot, inputslot);
|
||||||
|
ExecMaterializeSlot(oldSlot);
|
||||||
|
Assert(context->tmfd.traversed);
|
||||||
|
|
||||||
/* Make sure ri_oldTupleSlot is initialized. */
|
epqslot = EvalPlanQual(context->epqstate,
|
||||||
if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
|
resultRelationDesc,
|
||||||
ExecInitUpdateProjection(context->mtstate,
|
resultRelInfo->ri_RangeTableIndex,
|
||||||
resultRelInfo);
|
inputslot);
|
||||||
|
if (TupIsNull(epqslot))
|
||||||
/*
|
/* Tuple not passing quals anymore, exiting... */
|
||||||
* Save the locked tuple for further calculation
|
return NULL;
|
||||||
* of the new tuple.
|
slot = ExecGetUpdateNewTuple(resultRelInfo,
|
||||||
*/
|
epqslot, oldSlot);
|
||||||
oldSlot = resultRelInfo->ri_oldTupleSlot;
|
goto redo_act;
|
||||||
ExecCopySlot(oldSlot, inputslot);
|
|
||||||
ExecMaterializeSlot(oldSlot);
|
|
||||||
|
|
||||||
epqslot = EvalPlanQual(context->epqstate,
|
|
||||||
resultRelationDesc,
|
|
||||||
resultRelInfo->ri_RangeTableIndex,
|
|
||||||
inputslot);
|
|
||||||
if (TupIsNull(epqslot))
|
|
||||||
/* Tuple not passing quals anymore, exiting... */
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
slot = ExecGetUpdateNewTuple(resultRelInfo,
|
|
||||||
epqslot, oldSlot);
|
|
||||||
goto redo_act;
|
|
||||||
|
|
||||||
case TM_Deleted:
|
|
||||||
/* tuple already deleted; nothing to do */
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
case TM_SelfModified:
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This can be reached when following an update
|
|
||||||
* chain from a tuple updated by another session,
|
|
||||||
* reaching a tuple that was already updated in
|
|
||||||
* this transaction. If previously modified by
|
|
||||||
* this command, ignore the redundant update,
|
|
||||||
* otherwise error out.
|
|
||||||
*
|
|
||||||
* See also TM_SelfModified response to
|
|
||||||
* table_tuple_update() above.
|
|
||||||
*/
|
|
||||||
if (context->tmfd.cmax != estate->es_output_cid)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
|
|
||||||
errmsg("tuple to be updated was already modified by an operation triggered by the current command"),
|
|
||||||
errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows.")));
|
|
||||||
return NULL;
|
|
||||||
|
|
||||||
default:
|
|
||||||
/* see table_tuple_lock call in ExecDelete() */
|
|
||||||
elog(ERROR, "unexpected table_tuple_lock status: %u",
|
|
||||||
result);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -2710,7 +2665,7 @@ ExecOnConflictUpdate(ModifyTableContext *context,
|
||||||
*returning = ExecUpdate(context, resultRelInfo,
|
*returning = ExecUpdate(context, resultRelInfo,
|
||||||
conflictTid, NULL,
|
conflictTid, NULL,
|
||||||
resultRelInfo->ri_onConflict->oc_ProjSlot,
|
resultRelInfo->ri_onConflict->oc_ProjSlot,
|
||||||
canSetTag);
|
canSetTag, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Clear out existing tuple, as there might not be another conflict among
|
* Clear out existing tuple, as there might not be another conflict among
|
||||||
|
@ -2913,7 +2868,7 @@ lmerge_matched:
|
||||||
break; /* concurrent update/delete */
|
break; /* concurrent update/delete */
|
||||||
}
|
}
|
||||||
result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
|
result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
|
||||||
newslot, false, &updateCxt);
|
newslot, false, false, &updateCxt);
|
||||||
if (result == TM_Ok && updateCxt.updated)
|
if (result == TM_Ok && updateCxt.updated)
|
||||||
{
|
{
|
||||||
ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
|
ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
|
||||||
|
@ -2931,7 +2886,8 @@ lmerge_matched:
|
||||||
return true; /* "do nothing" */
|
return true; /* "do nothing" */
|
||||||
break; /* concurrent update/delete */
|
break; /* concurrent update/delete */
|
||||||
}
|
}
|
||||||
result = ExecDeleteAct(context, resultRelInfo, tupleid, false);
|
result = ExecDeleteAct(context, resultRelInfo, tupleid,
|
||||||
|
false, false);
|
||||||
if (result == TM_Ok)
|
if (result == TM_Ok)
|
||||||
{
|
{
|
||||||
ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
|
ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL,
|
||||||
|
@ -3837,7 +3793,7 @@ ExecModifyTable(PlanState *pstate)
|
||||||
|
|
||||||
/* Now apply the update. */
|
/* Now apply the update. */
|
||||||
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
|
slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
|
||||||
slot, node->canSetTag);
|
slot, node->canSetTag, false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CMD_DELETE:
|
case CMD_DELETE:
|
||||||
|
|
|
@ -530,7 +530,8 @@ typedef struct TableAmRoutine
|
||||||
Snapshot crosscheck,
|
Snapshot crosscheck,
|
||||||
bool wait,
|
bool wait,
|
||||||
TM_FailureData *tmfd,
|
TM_FailureData *tmfd,
|
||||||
bool changingPart);
|
bool changingPart,
|
||||||
|
LazyTupleTableSlot *lockedSlot);
|
||||||
|
|
||||||
/* see table_tuple_update() for reference about parameters */
|
/* see table_tuple_update() for reference about parameters */
|
||||||
TM_Result (*tuple_update) (Relation rel,
|
TM_Result (*tuple_update) (Relation rel,
|
||||||
|
@ -542,7 +543,8 @@ typedef struct TableAmRoutine
|
||||||
bool wait,
|
bool wait,
|
||||||
TM_FailureData *tmfd,
|
TM_FailureData *tmfd,
|
||||||
LockTupleMode *lockmode,
|
LockTupleMode *lockmode,
|
||||||
TU_UpdateIndexes *update_indexes);
|
TU_UpdateIndexes *update_indexes,
|
||||||
|
LazyTupleTableSlot *lockedSlot);
|
||||||
|
|
||||||
/* see table_tuple_lock() for reference about parameters */
|
/* see table_tuple_lock() for reference about parameters */
|
||||||
TM_Result (*tuple_lock) (Relation rel,
|
TM_Result (*tuple_lock) (Relation rel,
|
||||||
|
@ -1457,7 +1459,7 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Delete a tuple.
|
* Delete a tuple (or lock last tuple version if lockedSlot is given).
|
||||||
*
|
*
|
||||||
* NB: do not call this directly unless prepared to deal with
|
* NB: do not call this directly unless prepared to deal with
|
||||||
* concurrent-update conditions. Use simple_table_tuple_delete instead.
|
* concurrent-update conditions. Use simple_table_tuple_delete instead.
|
||||||
|
@ -1473,6 +1475,8 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
|
||||||
* tmfd - filled in failure cases (see below)
|
* tmfd - filled in failure cases (see below)
|
||||||
* changingPart - true iff the tuple is being moved to another partition
|
* changingPart - true iff the tuple is being moved to another partition
|
||||||
* table due to an update of the partition key. Otherwise, false.
|
* table due to an update of the partition key. Otherwise, false.
|
||||||
|
* lockedSlot - lazy slot to save the locked tuple if should lock the last
|
||||||
|
* row version during the concurrent update. NULL if not needed.
|
||||||
*
|
*
|
||||||
* Normal, successful return value is TM_Ok, which means we did actually
|
* Normal, successful return value is TM_Ok, which means we did actually
|
||||||
* delete it. Failure return codes are TM_SelfModified, TM_Updated, and
|
* delete it. Failure return codes are TM_SelfModified, TM_Updated, and
|
||||||
|
@ -1485,15 +1489,17 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
|
||||||
static inline TM_Result
|
static inline TM_Result
|
||||||
table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
|
table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
|
||||||
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
||||||
TM_FailureData *tmfd, bool changingPart)
|
TM_FailureData *tmfd, bool changingPart,
|
||||||
|
LazyTupleTableSlot *lockedSlot)
|
||||||
{
|
{
|
||||||
return rel->rd_tableam->tuple_delete(rel, tid, cid,
|
return rel->rd_tableam->tuple_delete(rel, tid, cid,
|
||||||
snapshot, crosscheck,
|
snapshot, crosscheck,
|
||||||
wait, tmfd, changingPart);
|
wait, tmfd, changingPart,
|
||||||
|
lockedSlot);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Update a tuple.
|
* Update a tuple (or lock last tuple version if lockedSlot is given).
|
||||||
*
|
*
|
||||||
* NB: do not call this directly unless you are prepared to deal with
|
* NB: do not call this directly unless you are prepared to deal with
|
||||||
* concurrent-update conditions. Use simple_table_tuple_update instead.
|
* concurrent-update conditions. Use simple_table_tuple_update instead.
|
||||||
|
@ -1511,7 +1517,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid,
|
||||||
* lockmode - filled with lock mode acquired on tuple
|
* lockmode - filled with lock mode acquired on tuple
|
||||||
* update_indexes - in success cases this is set to true if new index entries
|
* update_indexes - in success cases this is set to true if new index entries
|
||||||
* are required for this tuple
|
* are required for this tuple
|
||||||
*
|
* lockedSlot - lazy slot to save the locked tuple if should lock the last
|
||||||
|
* row version during the concurrent update. NULL if not needed.
|
||||||
|
|
||||||
* Normal, successful return value is TM_Ok, which means we did actually
|
* Normal, successful return value is TM_Ok, which means we did actually
|
||||||
* update it. Failure return codes are TM_SelfModified, TM_Updated, and
|
* update it. Failure return codes are TM_SelfModified, TM_Updated, and
|
||||||
* TM_BeingModified (the last only possible if wait == false).
|
* TM_BeingModified (the last only possible if wait == false).
|
||||||
|
@ -1530,12 +1538,14 @@ static inline TM_Result
|
||||||
table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
|
table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
|
||||||
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
|
||||||
bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
|
bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
|
||||||
TU_UpdateIndexes *update_indexes)
|
TU_UpdateIndexes *update_indexes,
|
||||||
|
LazyTupleTableSlot *lockedSlot)
|
||||||
{
|
{
|
||||||
return rel->rd_tableam->tuple_update(rel, otid, slot,
|
return rel->rd_tableam->tuple_update(rel, otid, slot,
|
||||||
cid, snapshot, crosscheck,
|
cid, snapshot, crosscheck,
|
||||||
wait, tmfd,
|
wait, tmfd,
|
||||||
lockmode, update_indexes);
|
lockmode, update_indexes,
|
||||||
|
lockedSlot);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -300,6 +300,44 @@ typedef struct MinimalTupleTableSlot
|
||||||
#define TupIsNull(slot) \
|
#define TupIsNull(slot) \
|
||||||
((slot) == NULL || TTS_EMPTY(slot))
|
((slot) == NULL || TTS_EMPTY(slot))
|
||||||
|
|
||||||
|
/*----------
|
||||||
|
* LazyTupleTableSlot -- a lazy version of TupleTableSlot.
|
||||||
|
*
|
||||||
|
* Sometimes caller might need to pass to the function a slot, which most
|
||||||
|
* likely will reain undemanded. Preallocating such slot would be a waste of
|
||||||
|
* resources in the majority of cases. Lazy slot is aimed to resolve this
|
||||||
|
* problem. It is basically a promise to allocate the slot once it's needed.
|
||||||
|
* Once callee needs the slot, it could get it using LAZY_TTS_EVAL(lazySlot)
|
||||||
|
* macro.
|
||||||
|
*/
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
TupleTableSlot *slot; /* cached slot or NULL if not yet allocated */
|
||||||
|
TupleTableSlot *(*getSlot) (void *arg); /* callback for slot allocation */
|
||||||
|
void *getSlotArg; /* argument for the callback above */
|
||||||
|
} LazyTupleTableSlot;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A constructor for the lazy slot.
|
||||||
|
*/
|
||||||
|
#define MAKE_LAZY_TTS(lazySlot, callback, arg) \
|
||||||
|
do { \
|
||||||
|
(lazySlot)->slot = NULL; \
|
||||||
|
(lazySlot)->getSlot = callback; \
|
||||||
|
(lazySlot)->getSlotArg = arg; \
|
||||||
|
} while (false)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Macro for lazy slot evaluation. NULL lazy slot evaluates to NULL slot.
|
||||||
|
* Cached version is used if present. Use the callback otherwise.
|
||||||
|
*/
|
||||||
|
#define LAZY_TTS_EVAL(lazySlot) \
|
||||||
|
((lazySlot) ? \
|
||||||
|
((lazySlot)->slot ? \
|
||||||
|
(lazySlot)->slot : \
|
||||||
|
((lazySlot)->slot = (lazySlot)->getSlot((lazySlot)->getSlotArg))) : \
|
||||||
|
NULL)
|
||||||
|
|
||||||
/* in executor/execTuples.c */
|
/* in executor/execTuples.c */
|
||||||
extern TupleTableSlot *MakeTupleTableSlot(TupleDesc tupleDesc,
|
extern TupleTableSlot *MakeTupleTableSlot(TupleDesc tupleDesc,
|
||||||
const TupleTableSlotOps *tts_ops);
|
const TupleTableSlotOps *tts_ops);
|
||||||
|
|
|
@ -955,6 +955,7 @@ GenerationPointer
|
||||||
GenericCosts
|
GenericCosts
|
||||||
GenericXLogState
|
GenericXLogState
|
||||||
GeqoPrivateData
|
GeqoPrivateData
|
||||||
|
GetEPQSlotArg
|
||||||
GetForeignJoinPaths_function
|
GetForeignJoinPaths_function
|
||||||
GetForeignModifyBatchSize_function
|
GetForeignModifyBatchSize_function
|
||||||
GetForeignPaths_function
|
GetForeignPaths_function
|
||||||
|
@ -1399,6 +1400,7 @@ LagTracker
|
||||||
LargeObjectDesc
|
LargeObjectDesc
|
||||||
LastAttnumInfo
|
LastAttnumInfo
|
||||||
Latch
|
Latch
|
||||||
|
LazyTupleTableSlot
|
||||||
LerpFunc
|
LerpFunc
|
||||||
LexDescr
|
LexDescr
|
||||||
LexemeEntry
|
LexemeEntry
|
||||||
|
|
Loading…
Reference in New Issue