MERGE post-commit review
Review comments from Andres Freund * Consolidate code into AfterTriggerGetTransitionTable() * Rename nodeMerge.c to execMerge.c * Rename nodeMerge.h to execMerge.h * Move MERGE handling in ExecInitModifyTable() into a execMerge.c ExecInitMerge() * Move mt_merge_subcommands flags into execMerge.h * Rename opt_and_condition to opt_merge_when_and_condition * Wordsmith various comments Author: Pavan Deolasee Reviewer: Simon Riggs
This commit is contained in:
parent
1fd8690668
commit
4b2d44031f
|
@ -96,6 +96,12 @@ static HeapTuple ExecCallTriggerFunc(TriggerData *trigdata,
|
|||
FmgrInfo *finfo,
|
||||
Instrumentation *instr,
|
||||
MemoryContext per_tuple_context);
|
||||
static Tuplestorestate *AfterTriggerGetTransitionTable(int event,
|
||||
HeapTuple oldtup,
|
||||
HeapTuple newtup,
|
||||
TransitionCaptureState *transition_capture);
|
||||
static void TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
|
||||
TupleConversionMap *map);
|
||||
static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
||||
int event, bool row_trigger,
|
||||
HeapTuple oldtup, HeapTuple newtup,
|
||||
|
@ -3846,6 +3852,14 @@ struct AfterTriggersTableData
|
|||
bool before_trig_done; /* did we already queue BS triggers? */
|
||||
bool after_trig_done; /* did we already queue AS triggers? */
|
||||
AfterTriggerEventList after_trig_events; /* if so, saved list pointer */
|
||||
|
||||
/*
|
||||
* We maintain separate transaction tables for UPDATE/INSERT/DELETE since
|
||||
* MERGE can run all three actions in a single statement. Note that UPDATE
|
||||
* needs both old and new transition tables whereas INSERT needs only new
|
||||
* and DELETE needs only old.
|
||||
*/
|
||||
|
||||
/* "old" transition table for UPDATE, if any */
|
||||
Tuplestorestate *old_upd_tuplestore;
|
||||
/* "new" transition table for UPDATE, if any */
|
||||
|
@ -5716,6 +5730,84 @@ AfterTriggerPendingOnRel(Oid relid)
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the transition table for the given event and depending on whether we are
|
||||
* processing the old or the new tuple.
|
||||
*/
|
||||
static Tuplestorestate *
|
||||
AfterTriggerGetTransitionTable(int event,
|
||||
HeapTuple oldtup,
|
||||
HeapTuple newtup,
|
||||
TransitionCaptureState *transition_capture)
|
||||
{
|
||||
Tuplestorestate *tuplestore = NULL;
|
||||
bool delete_old_table = transition_capture->tcs_delete_old_table;
|
||||
bool update_old_table = transition_capture->tcs_update_old_table;
|
||||
bool update_new_table = transition_capture->tcs_update_new_table;
|
||||
bool insert_new_table = transition_capture->tcs_insert_new_table;;
|
||||
|
||||
/*
|
||||
* For INSERT events newtup should be non-NULL, for DELETE events
|
||||
* oldtup should be non-NULL, whereas for UPDATE events normally both
|
||||
* oldtup and newtup are non-NULL. But for UPDATE events fired for
|
||||
* capturing transition tuples during UPDATE partition-key row
|
||||
* movement, oldtup is NULL when the event is for a row being inserted,
|
||||
* whereas newtup is NULL when the event is for a row being deleted.
|
||||
*/
|
||||
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
|
||||
oldtup == NULL));
|
||||
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
|
||||
newtup == NULL));
|
||||
|
||||
/*
|
||||
* We're called either for the newtup or the oldtup, but not both at the
|
||||
* same time.
|
||||
*/
|
||||
Assert((oldtup != NULL) ^ (newtup != NULL));
|
||||
|
||||
if (oldtup != NULL)
|
||||
{
|
||||
if (event == TRIGGER_EVENT_DELETE && delete_old_table)
|
||||
tuplestore = transition_capture->tcs_private->old_del_tuplestore;
|
||||
else if (event == TRIGGER_EVENT_UPDATE && update_old_table)
|
||||
tuplestore = transition_capture->tcs_private->old_upd_tuplestore;
|
||||
}
|
||||
|
||||
if (newtup != NULL)
|
||||
{
|
||||
if (event == TRIGGER_EVENT_INSERT && insert_new_table)
|
||||
tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
|
||||
else if (event == TRIGGER_EVENT_UPDATE && update_new_table)
|
||||
tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
|
||||
}
|
||||
|
||||
return tuplestore;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add the given heap tuple to the given tuplestore, applying the conversion
|
||||
* map if necessary.
|
||||
*/
|
||||
static void
|
||||
TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
|
||||
TupleConversionMap *map)
|
||||
{
|
||||
/*
|
||||
* Nothing needs to be done if we don't have a tuplestore.
|
||||
*/
|
||||
if (tuplestore == NULL)
|
||||
return;
|
||||
|
||||
if (map != NULL)
|
||||
{
|
||||
HeapTuple converted = do_convert_tuple(heaptup, map);
|
||||
|
||||
tuplestore_puttuple(tuplestore, converted);
|
||||
pfree(converted);
|
||||
}
|
||||
else
|
||||
tuplestore_puttuple(tuplestore, heaptup);
|
||||
}
|
||||
|
||||
/* ----------
|
||||
* AfterTriggerSaveEvent()
|
||||
|
@ -5777,95 +5869,37 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
|
|||
{
|
||||
HeapTuple original_insert_tuple = transition_capture->tcs_original_insert_tuple;
|
||||
TupleConversionMap *map = transition_capture->tcs_map;
|
||||
bool delete_old_table = transition_capture->tcs_delete_old_table;
|
||||
bool update_old_table = transition_capture->tcs_update_old_table;
|
||||
bool update_new_table = transition_capture->tcs_update_new_table;
|
||||
bool insert_new_table = transition_capture->tcs_insert_new_table;;
|
||||
|
||||
/*
|
||||
* For INSERT events newtup should be non-NULL, for DELETE events
|
||||
* oldtup should be non-NULL, whereas for UPDATE events normally both
|
||||
* oldtup and newtup are non-NULL. But for UPDATE events fired for
|
||||
* capturing transition tuples during UPDATE partition-key row
|
||||
* movement, oldtup is NULL when the event is for a row being inserted,
|
||||
* whereas newtup is NULL when the event is for a row being deleted.
|
||||
* Capture the old tuple in the appropriate transition table based on
|
||||
* the event.
|
||||
*/
|
||||
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
|
||||
oldtup == NULL));
|
||||
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
|
||||
newtup == NULL));
|
||||
|
||||
if (oldtup != NULL &&
|
||||
(event == TRIGGER_EVENT_DELETE && delete_old_table))
|
||||
if (oldtup != NULL)
|
||||
{
|
||||
Tuplestorestate *old_tuplestore;
|
||||
|
||||
old_tuplestore = transition_capture->tcs_private->old_del_tuplestore;
|
||||
|
||||
if (map != NULL)
|
||||
{
|
||||
HeapTuple converted = do_convert_tuple(oldtup, map);
|
||||
|
||||
tuplestore_puttuple(old_tuplestore, converted);
|
||||
pfree(converted);
|
||||
}
|
||||
else
|
||||
tuplestore_puttuple(old_tuplestore, oldtup);
|
||||
Tuplestorestate *tuplestore =
|
||||
AfterTriggerGetTransitionTable(event,
|
||||
oldtup,
|
||||
NULL,
|
||||
transition_capture);
|
||||
TransitionTableAddTuple(oldtup, tuplestore, map);
|
||||
}
|
||||
if (oldtup != NULL &&
|
||||
(event == TRIGGER_EVENT_UPDATE && update_old_table))
|
||||
|
||||
/*
|
||||
* Capture the new tuple in the appropriate transition table based on
|
||||
* the event.
|
||||
*/
|
||||
if (newtup != NULL)
|
||||
{
|
||||
Tuplestorestate *old_tuplestore;
|
||||
|
||||
old_tuplestore = transition_capture->tcs_private->old_upd_tuplestore;
|
||||
|
||||
if (map != NULL)
|
||||
{
|
||||
HeapTuple converted = do_convert_tuple(oldtup, map);
|
||||
|
||||
tuplestore_puttuple(old_tuplestore, converted);
|
||||
pfree(converted);
|
||||
}
|
||||
else
|
||||
tuplestore_puttuple(old_tuplestore, oldtup);
|
||||
}
|
||||
if (newtup != NULL &&
|
||||
(event == TRIGGER_EVENT_INSERT && insert_new_table))
|
||||
{
|
||||
Tuplestorestate *new_tuplestore;
|
||||
|
||||
new_tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
|
||||
Tuplestorestate *tuplestore =
|
||||
AfterTriggerGetTransitionTable(event,
|
||||
NULL,
|
||||
newtup,
|
||||
transition_capture);
|
||||
|
||||
if (original_insert_tuple != NULL)
|
||||
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
|
||||
else if (map != NULL)
|
||||
{
|
||||
HeapTuple converted = do_convert_tuple(newtup, map);
|
||||
|
||||
tuplestore_puttuple(new_tuplestore, converted);
|
||||
pfree(converted);
|
||||
}
|
||||
tuplestore_puttuple(tuplestore, original_insert_tuple);
|
||||
else
|
||||
tuplestore_puttuple(new_tuplestore, newtup);
|
||||
}
|
||||
if (newtup != NULL &&
|
||||
(event == TRIGGER_EVENT_UPDATE && update_new_table))
|
||||
{
|
||||
Tuplestorestate *new_tuplestore;
|
||||
|
||||
new_tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
|
||||
|
||||
if (original_insert_tuple != NULL)
|
||||
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
|
||||
else if (map != NULL)
|
||||
{
|
||||
HeapTuple converted = do_convert_tuple(newtup, map);
|
||||
|
||||
tuplestore_puttuple(new_tuplestore, converted);
|
||||
pfree(converted);
|
||||
}
|
||||
else
|
||||
tuplestore_puttuple(new_tuplestore, newtup);
|
||||
TransitionTableAddTuple(newtup, tuplestore, map);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
|
|||
|
||||
OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
|
||||
execGrouping.o execIndexing.o execJunk.o \
|
||||
execMain.o execParallel.o execPartition.o execProcnode.o \
|
||||
execMain.o execMerge.o execParallel.o execPartition.o execProcnode.o \
|
||||
execReplication.o execScan.o execSRF.o execTuples.o \
|
||||
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
|
||||
nodeBitmapAnd.o nodeBitmapOr.o \
|
||||
|
@ -22,7 +22,7 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
|
|||
nodeCustom.o nodeFunctionscan.o nodeGather.o \
|
||||
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
|
||||
nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
|
||||
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeMerge.o nodeModifyTable.o \
|
||||
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
|
||||
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
|
||||
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
|
||||
nodeValuesscan.o \
|
||||
|
|
|
@ -39,13 +39,14 @@ ModifyTable node visits each of those rows and marks the row deleted.
|
|||
|
||||
MERGE runs one generic plan that returns candidate target rows. Each row
|
||||
consists of a super-row that contains all the columns needed by any of the
|
||||
individual actions, plus a CTID and a TABLEOID junk columns. The CTID column is
|
||||
individual actions, plus CTID and TABLEOID junk columns. The CTID column is
|
||||
required to know if a matching target row was found or not and the TABLEOID
|
||||
column is needed to find the underlying target partition, in case when the
|
||||
target table is a partition table. If the CTID column is set we attempt to
|
||||
activate WHEN MATCHED actions, or if it is NULL then we will attempt to
|
||||
activate WHEN NOT MATCHED actions. Once we know which action is activated we
|
||||
form the final result row and apply only those changes.
|
||||
target table is a partition table. When a matching target tuple is found, the
|
||||
CTID column identifies the matching target tuple and we attempt to activate
|
||||
WHEN MATCHED actions. If a matching tuple is not found, then CTID column is
|
||||
NULL and we attempt to activate WHEN NOT MATCHED actions. Once we know which
|
||||
action is activated we form the final result row and apply only those changes.
|
||||
|
||||
XXX a great deal more documentation needs to be written here...
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* nodeMerge.c
|
||||
* execMerge.c
|
||||
* routines to handle Merge nodes relating to the MERGE command
|
||||
*
|
||||
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
||||
|
@ -8,7 +8,7 @@
|
|||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/executor/nodeMerge.c
|
||||
* src/backend/executor/execMerge.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
@ -22,7 +22,7 @@
|
|||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeModifyTable.h"
|
||||
#include "executor/nodeMerge.h"
|
||||
#include "executor/execMerge.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/nodeFuncs.h"
|
||||
#include "storage/bufmgr.h"
|
||||
|
@ -32,6 +32,110 @@
|
|||
#include "utils/rel.h"
|
||||
#include "utils/tqual.h"
|
||||
|
||||
static void ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
|
||||
TupleTableSlot *slot);
|
||||
static bool ExecMergeMatched(ModifyTableState *mtstate, EState *estate,
|
||||
TupleTableSlot *slot, JunkFilter *junkfilter,
|
||||
ItemPointer tupleid);
|
||||
/*
|
||||
* Perform MERGE.
|
||||
*/
|
||||
void
|
||||
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
|
||||
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo)
|
||||
{
|
||||
ExprContext *econtext = mtstate->ps.ps_ExprContext;
|
||||
ItemPointer tupleid;
|
||||
ItemPointerData tuple_ctid;
|
||||
bool matched = false;
|
||||
char relkind;
|
||||
Datum datum;
|
||||
bool isNull;
|
||||
|
||||
relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
|
||||
Assert(relkind == RELKIND_RELATION ||
|
||||
relkind == RELKIND_PARTITIONED_TABLE);
|
||||
|
||||
/*
|
||||
* Reset per-tuple memory context to free any expression evaluation
|
||||
* storage allocated in the previous cycle.
|
||||
*/
|
||||
ResetExprContext(econtext);
|
||||
|
||||
/*
|
||||
* We run a JOIN between the target relation and the source relation to
|
||||
* find a set of candidate source rows that has matching row in the target
|
||||
* table and a set of candidate source rows that does not have matching
|
||||
* row in the target table. If the join returns us a tuple with target
|
||||
* relation's tid set, that implies that the join found a matching row for
|
||||
* the given source tuple. This case triggers the WHEN MATCHED clause of
|
||||
* the MERGE. Whereas a NULL in the target relation's ctid column
|
||||
* indicates a NOT MATCHED case.
|
||||
*/
|
||||
datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);
|
||||
|
||||
if (!isNull)
|
||||
{
|
||||
matched = true;
|
||||
tupleid = (ItemPointer) DatumGetPointer(datum);
|
||||
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
|
||||
tupleid = &tuple_ctid;
|
||||
}
|
||||
else
|
||||
{
|
||||
matched = false;
|
||||
tupleid = NULL; /* we don't need it for INSERT actions */
|
||||
}
|
||||
|
||||
/*
|
||||
* If we are dealing with a WHEN MATCHED case, we execute the first action
|
||||
* for which the additional WHEN MATCHED AND quals pass. If an action
|
||||
* without quals is found, that action is executed.
|
||||
*
|
||||
* Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
|
||||
* given WHEN NOT MATCHED actions in sequence until one passes.
|
||||
*
|
||||
* Things get interesting in case of concurrent update/delete of the
|
||||
* target tuple. Such concurrent update/delete is detected while we are
|
||||
* executing a WHEN MATCHED action.
|
||||
*
|
||||
* A concurrent update can:
|
||||
*
|
||||
* 1. modify the target tuple so that it no longer satisfies the
|
||||
* additional quals attached to the current WHEN MATCHED action OR
|
||||
*
|
||||
* In this case, we are still dealing with a WHEN MATCHED case, but
|
||||
* we should recheck the list of WHEN MATCHED actions and choose the first
|
||||
* one that satisfies the new target tuple.
|
||||
*
|
||||
* 2. modify the target tuple so that the join quals no longer pass and
|
||||
* hence the source tuple no longer has a match.
|
||||
*
|
||||
* In the second case, the source tuple no longer matches the target tuple,
|
||||
* so we now instead find a qualifying WHEN NOT MATCHED action to execute.
|
||||
*
|
||||
* A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
|
||||
*
|
||||
* ExecMergeMatched takes care of following the update chain and
|
||||
* re-finding the qualifying WHEN MATCHED action, as long as the updated
|
||||
* target tuple still satisfies the join quals i.e. it still remains a
|
||||
* WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
|
||||
* returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
|
||||
* always make progress by following the update chain and we never switch
|
||||
* from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
|
||||
* livelock.
|
||||
*/
|
||||
if (matched)
|
||||
matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid);
|
||||
|
||||
/*
|
||||
* Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
|
||||
* returned "false", indicating the previously MATCHED tuple is no longer a
|
||||
* matching tuple.
|
||||
*/
|
||||
if (!matched)
|
||||
ExecMergeNotMatched(mtstate, estate, slot);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check and execute the first qualifying MATCHED action. The current target
|
||||
|
@ -248,8 +352,9 @@ lmerge_matched:;
|
|||
|
||||
/*
|
||||
* This state should never be reached since the underlying
|
||||
* JOIN runs with a MVCC snapshot and should only return
|
||||
* rows visible to us.
|
||||
* JOIN runs with a MVCC snapshot and EvalPlanQual runs
|
||||
* with a dirty snapshot. So such a row should have never
|
||||
* been returned for MERGE.
|
||||
*/
|
||||
elog(ERROR, "unexpected invisible tuple");
|
||||
break;
|
||||
|
@ -392,10 +497,10 @@ ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
|
|||
TupleTableSlot *myslot;
|
||||
|
||||
/*
|
||||
* We are dealing with NOT MATCHED tuple. Since for MERGE, partition tree
|
||||
* is not expanded for the result relation, we continue to work with the
|
||||
* currently active result relation, which should be of the root of the
|
||||
* partition tree.
|
||||
* We are dealing with NOT MATCHED tuple. Since for MERGE, the partition
|
||||
* tree is not expanded for the result relation, we continue to work with
|
||||
* the currently active result relation, which corresponds to the root
|
||||
* of the partition tree.
|
||||
*/
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
|
@ -474,102 +579,105 @@ ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform MERGE.
|
||||
*/
|
||||
void
|
||||
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
|
||||
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo)
|
||||
ExecInitMerge(ModifyTableState *mtstate, EState *estate,
|
||||
ResultRelInfo *resultRelInfo)
|
||||
{
|
||||
ExprContext *econtext = mtstate->ps.ps_ExprContext;
|
||||
ItemPointer tupleid;
|
||||
ItemPointerData tuple_ctid;
|
||||
bool matched = false;
|
||||
char relkind;
|
||||
Datum datum;
|
||||
bool isNull;
|
||||
ListCell *l;
|
||||
ExprContext *econtext;
|
||||
List *mergeMatchedActionStates = NIL;
|
||||
List *mergeNotMatchedActionStates = NIL;
|
||||
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
|
||||
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
|
||||
|
||||
relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
|
||||
Assert(relkind == RELKIND_RELATION ||
|
||||
relkind == RELKIND_PARTITIONED_TABLE);
|
||||
if (node->mergeActionList == NIL)
|
||||
return;
|
||||
|
||||
mtstate->mt_merge_subcommands = 0;
|
||||
|
||||
if (mtstate->ps.ps_ExprContext == NULL)
|
||||
ExecAssignExprContext(estate, &mtstate->ps);
|
||||
|
||||
econtext = mtstate->ps.ps_ExprContext;
|
||||
|
||||
/* initialize slot for the existing tuple */
|
||||
Assert(mtstate->mt_existing == NULL);
|
||||
mtstate->mt_existing =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/* initialize slot for merge actions */
|
||||
Assert(mtstate->mt_mergeproj == NULL);
|
||||
mtstate->mt_mergeproj =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/*
|
||||
* Reset per-tuple memory context to free any expression evaluation
|
||||
* storage allocated in the previous cycle.
|
||||
* Create a MergeActionState for each action on the mergeActionList
|
||||
* and add it to either a list of matched actions or not-matched
|
||||
* actions.
|
||||
*/
|
||||
ResetExprContext(econtext);
|
||||
|
||||
/*
|
||||
* We run a JOIN between the target relation and the source relation to
|
||||
* find a set of candidate source rows that has matching row in the target
|
||||
* table and a set of candidate source rows that does not have matching
|
||||
* row in the target table. If the join returns us a tuple with target
|
||||
* relation's tid set, that implies that the join found a matching row for
|
||||
* the given source tuple. This case triggers the WHEN MATCHED clause of
|
||||
* the MERGE. Whereas a NULL in the target relation's ctid column
|
||||
* indicates a NOT MATCHED case.
|
||||
*/
|
||||
datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);
|
||||
|
||||
if (!isNull)
|
||||
foreach(l, node->mergeActionList)
|
||||
{
|
||||
matched = true;
|
||||
tupleid = (ItemPointer) DatumGetPointer(datum);
|
||||
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
|
||||
tupleid = &tuple_ctid;
|
||||
}
|
||||
else
|
||||
{
|
||||
matched = false;
|
||||
tupleid = NULL; /* we don't need it for INSERT actions */
|
||||
}
|
||||
MergeAction *action = (MergeAction *) lfirst(l);
|
||||
MergeActionState *action_state = makeNode(MergeActionState);
|
||||
TupleDesc tupDesc;
|
||||
|
||||
/*
|
||||
* If we are dealing with a WHEN MATCHED case, we execute the first action
|
||||
* for which the additional WHEN MATCHED AND quals pass. If an action
|
||||
* without quals is found, that action is executed.
|
||||
*
|
||||
* Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
|
||||
* given WHEN NOT MATCHED actions in sequence until one passes.
|
||||
*
|
||||
* Things get interesting in case of concurrent update/delete of the
|
||||
* target tuple. Such concurrent update/delete is detected while we are
|
||||
* executing a WHEN MATCHED action.
|
||||
*
|
||||
* A concurrent update can:
|
||||
*
|
||||
* 1. modify the target tuple so that it no longer satisfies the
|
||||
* additional quals attached to the current WHEN MATCHED action OR
|
||||
*
|
||||
* In this case, we are still dealing with a WHEN MATCHED case, but
|
||||
* we should recheck the list of WHEN MATCHED actions and choose the first
|
||||
* one that satisfies the new target tuple.
|
||||
*
|
||||
* 2. modify the target tuple so that the join quals no longer pass and
|
||||
* hence the source tuple no longer has a match.
|
||||
*
|
||||
* In the second case, the source tuple no longer matches the target tuple,
|
||||
* so we now instead find a qualifying WHEN NOT MATCHED action to execute.
|
||||
*
|
||||
* A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
|
||||
*
|
||||
* ExecMergeMatched takes care of following the update chain and
|
||||
* re-finding the qualifying WHEN MATCHED action, as long as the updated
|
||||
* target tuple still satisfies the join quals i.e. it still remains a
|
||||
* WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
|
||||
* returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
|
||||
* always make progress by following the update chain and we never switch
|
||||
* from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
|
||||
* livelock.
|
||||
*/
|
||||
if (matched)
|
||||
matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid);
|
||||
action_state->matched = action->matched;
|
||||
action_state->commandType = action->commandType;
|
||||
action_state->whenqual = ExecInitQual((List *) action->qual,
|
||||
&mtstate->ps);
|
||||
|
||||
/*
|
||||
* Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
|
||||
* returned "false", indicating the previously MATCHED tuple is no longer a
|
||||
* matching tuple.
|
||||
*/
|
||||
if (!matched)
|
||||
ExecMergeNotMatched(mtstate, estate, slot);
|
||||
/* create target slot for this action's projection */
|
||||
tupDesc = ExecTypeFromTL((List *) action->targetList,
|
||||
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
|
||||
action_state->tupDesc = tupDesc;
|
||||
|
||||
/* build action projection state */
|
||||
action_state->proj =
|
||||
ExecBuildProjectionInfo(action->targetList, econtext,
|
||||
mtstate->mt_mergeproj, &mtstate->ps,
|
||||
resultRelInfo->ri_RelationDesc->rd_att);
|
||||
|
||||
/*
|
||||
* We create two lists - one for WHEN MATCHED actions and one
|
||||
* for WHEN NOT MATCHED actions - and stick the
|
||||
* MergeActionState into the appropriate list.
|
||||
*/
|
||||
if (action_state->matched)
|
||||
mergeMatchedActionStates =
|
||||
lappend(mergeMatchedActionStates, action_state);
|
||||
else
|
||||
mergeNotMatchedActionStates =
|
||||
lappend(mergeNotMatchedActionStates, action_state);
|
||||
|
||||
switch (action->commandType)
|
||||
{
|
||||
case CMD_INSERT:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_INSERT;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
mtstate->mt_merge_subcommands |= MERGE_DELETE;
|
||||
break;
|
||||
case CMD_NOTHING:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
}
|
||||
|
||||
resultRelInfo->ri_mergeState->matchedActionStates =
|
||||
mergeMatchedActionStates;
|
||||
resultRelInfo->ri_mergeState->notMatchedActionStates =
|
||||
mergeNotMatchedActionStates;
|
||||
}
|
||||
}
|
|
@ -313,6 +313,10 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
|
|||
/*
|
||||
* Given OID of the partition leaf, return the index of the leaf in the
|
||||
* partition hierarchy.
|
||||
*
|
||||
* NB: This is an O(N) operation. Unfortunately, there are many other problem
|
||||
* areas with more than a handful partitions, so we don't try to optimise this
|
||||
* code right now.
|
||||
*/
|
||||
int
|
||||
ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
|
||||
|
@ -325,7 +329,10 @@ ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
|
|||
break;
|
||||
}
|
||||
|
||||
Assert(i < proute->num_partitions);
|
||||
if (i >= proute->num_partitions)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("no partition found for OID %u", partoid)));
|
||||
return i;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@
|
|||
#include "commands/trigger.h"
|
||||
#include "executor/execPartition.h"
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeMerge.h"
|
||||
#include "executor/execMerge.h"
|
||||
#include "executor/nodeModifyTable.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "miscadmin.h"
|
||||
|
@ -69,11 +69,6 @@ static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
|
|||
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
|
||||
int whichplan);
|
||||
|
||||
/* flags for mt_merge_subcommands */
|
||||
#define MERGE_INSERT 0x01
|
||||
#define MERGE_UPDATE 0x02
|
||||
#define MERGE_DELETE 0x04
|
||||
|
||||
/*
|
||||
* Verify that the tuples to be produced by INSERT or UPDATE match the
|
||||
* target relation's rowtype
|
||||
|
@ -86,7 +81,7 @@ static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
|
|||
* The plan output is represented by its targetlist, because that makes
|
||||
* handling the dropped-column case easier.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
ExecCheckPlanOutput(Relation resultRel, List *targetList)
|
||||
{
|
||||
TupleDesc resultDesc = RelationGetDescr(resultRel);
|
||||
|
@ -2660,104 +2655,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
|||
}
|
||||
|
||||
resultRelInfo = mtstate->resultRelInfo;
|
||||
|
||||
if (node->mergeActionList)
|
||||
{
|
||||
ListCell *l;
|
||||
ExprContext *econtext;
|
||||
List *mergeMatchedActionStates = NIL;
|
||||
List *mergeNotMatchedActionStates = NIL;
|
||||
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
|
||||
|
||||
mtstate->mt_merge_subcommands = 0;
|
||||
|
||||
if (mtstate->ps.ps_ExprContext == NULL)
|
||||
ExecAssignExprContext(estate, &mtstate->ps);
|
||||
|
||||
econtext = mtstate->ps.ps_ExprContext;
|
||||
|
||||
/* initialize slot for the existing tuple */
|
||||
Assert(mtstate->mt_existing == NULL);
|
||||
mtstate->mt_existing =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/* initialize slot for merge actions */
|
||||
Assert(mtstate->mt_mergeproj == NULL);
|
||||
mtstate->mt_mergeproj =
|
||||
ExecInitExtraTupleSlot(mtstate->ps.state,
|
||||
mtstate->mt_partition_tuple_routing ?
|
||||
NULL : relationDesc);
|
||||
|
||||
/*
|
||||
* Create a MergeActionState for each action on the mergeActionList
|
||||
* and add it to either a list of matched actions or not-matched
|
||||
* actions.
|
||||
*/
|
||||
foreach(l, node->mergeActionList)
|
||||
{
|
||||
MergeAction *action = (MergeAction *) lfirst(l);
|
||||
MergeActionState *action_state = makeNode(MergeActionState);
|
||||
TupleDesc tupDesc;
|
||||
|
||||
action_state->matched = action->matched;
|
||||
action_state->commandType = action->commandType;
|
||||
action_state->whenqual = ExecInitQual((List *) action->qual,
|
||||
&mtstate->ps);
|
||||
|
||||
/* create target slot for this action's projection */
|
||||
tupDesc = ExecTypeFromTL((List *) action->targetList,
|
||||
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
|
||||
action_state->tupDesc = tupDesc;
|
||||
|
||||
/* build action projection state */
|
||||
action_state->proj =
|
||||
ExecBuildProjectionInfo(action->targetList, econtext,
|
||||
mtstate->mt_mergeproj, &mtstate->ps,
|
||||
resultRelInfo->ri_RelationDesc->rd_att);
|
||||
|
||||
/*
|
||||
* We create two lists - one for WHEN MATCHED actions and one
|
||||
* for WHEN NOT MATCHED actions - and stick the
|
||||
* MergeActionState into the appropriate list.
|
||||
*/
|
||||
if (action_state->matched)
|
||||
mergeMatchedActionStates =
|
||||
lappend(mergeMatchedActionStates, action_state);
|
||||
else
|
||||
mergeNotMatchedActionStates =
|
||||
lappend(mergeNotMatchedActionStates, action_state);
|
||||
|
||||
switch (action->commandType)
|
||||
{
|
||||
case CMD_INSERT:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_INSERT;
|
||||
break;
|
||||
case CMD_UPDATE:
|
||||
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
|
||||
action->targetList);
|
||||
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
mtstate->mt_merge_subcommands |= MERGE_DELETE;
|
||||
break;
|
||||
case CMD_NOTHING:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
break;
|
||||
}
|
||||
|
||||
resultRelInfo->ri_mergeState->matchedActionStates =
|
||||
mergeMatchedActionStates;
|
||||
resultRelInfo->ri_mergeState->notMatchedActionStates =
|
||||
mergeNotMatchedActionStates;
|
||||
|
||||
}
|
||||
}
|
||||
if (mtstate->operation == CMD_MERGE)
|
||||
ExecInitMerge(mtstate, estate, resultRelInfo);
|
||||
|
||||
/* select first subplan */
|
||||
mtstate->mt_whichplan = 0;
|
||||
|
|
|
@ -852,14 +852,14 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
|
|||
}
|
||||
|
||||
/*
|
||||
* The MERGE produces the target rows by performing a right
|
||||
* join between the target relation and the source relation
|
||||
* (which could be a plain relation or a subquery). The INSERT
|
||||
* and UPDATE actions of the MERGE requires access to the
|
||||
* columns from the source relation. We arrange things so that
|
||||
* the source relation attributes are available as INNER_VAR
|
||||
* and the target relation attributes are available from the
|
||||
* scan tuple.
|
||||
* The MERGE statement produces the target rows by performing a
|
||||
* right join between the target relation and the source
|
||||
* relation (which could be a plain relation or a subquery).
|
||||
* The INSERT and UPDATE actions of the MERGE statement
|
||||
* requires access to the columns from the source relation. We
|
||||
* arrange things so that the source relation attributes are
|
||||
* available as INNER_VAR and the target relation attributes
|
||||
* are available from the scan tuple.
|
||||
*/
|
||||
if (splan->mergeActionList != NIL)
|
||||
{
|
||||
|
|
|
@ -585,7 +585,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
|
|||
%type <list> hash_partbound partbound_datum_list range_datum_list
|
||||
%type <defelt> hash_partbound_elem
|
||||
|
||||
%type <node> merge_when_clause opt_and_condition
|
||||
%type <node> merge_when_clause opt_merge_when_and_condition
|
||||
%type <list> merge_when_list
|
||||
%type <node> merge_update merge_delete merge_insert
|
||||
|
||||
|
@ -11129,7 +11129,7 @@ merge_when_list:
|
|||
;
|
||||
|
||||
merge_when_clause:
|
||||
WHEN MATCHED opt_and_condition THEN merge_update
|
||||
WHEN MATCHED opt_merge_when_and_condition THEN merge_update
|
||||
{
|
||||
MergeAction *m = makeNode(MergeAction);
|
||||
|
||||
|
@ -11140,7 +11140,7 @@ merge_when_clause:
|
|||
|
||||
$$ = (Node *)m;
|
||||
}
|
||||
| WHEN MATCHED opt_and_condition THEN merge_delete
|
||||
| WHEN MATCHED opt_merge_when_and_condition THEN merge_delete
|
||||
{
|
||||
MergeAction *m = makeNode(MergeAction);
|
||||
|
||||
|
@ -11151,7 +11151,7 @@ merge_when_clause:
|
|||
|
||||
$$ = (Node *)m;
|
||||
}
|
||||
| WHEN NOT MATCHED opt_and_condition THEN merge_insert
|
||||
| WHEN NOT MATCHED opt_merge_when_and_condition THEN merge_insert
|
||||
{
|
||||
MergeAction *m = makeNode(MergeAction);
|
||||
|
||||
|
@ -11162,7 +11162,7 @@ merge_when_clause:
|
|||
|
||||
$$ = (Node *)m;
|
||||
}
|
||||
| WHEN NOT MATCHED opt_and_condition THEN DO NOTHING
|
||||
| WHEN NOT MATCHED opt_merge_when_and_condition THEN DO NOTHING
|
||||
{
|
||||
MergeAction *m = makeNode(MergeAction);
|
||||
|
||||
|
@ -11175,7 +11175,7 @@ merge_when_clause:
|
|||
}
|
||||
;
|
||||
|
||||
opt_and_condition:
|
||||
opt_merge_when_and_condition:
|
||||
AND a_expr { $$ = $2; }
|
||||
| { $$ = NULL; }
|
||||
;
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* execMerge.h
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* src/include/executor/execMerge.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef EXECMERGE_H
|
||||
#define EXECMERGE_H
|
||||
|
||||
#include "nodes/execnodes.h"
|
||||
|
||||
/* flags for mt_merge_subcommands */
|
||||
#define MERGE_INSERT 0x01
|
||||
#define MERGE_UPDATE 0x02
|
||||
#define MERGE_DELETE 0x04
|
||||
|
||||
extern void ExecMerge(ModifyTableState *mtstate, EState *estate,
|
||||
TupleTableSlot *slot, JunkFilter *junkfilter,
|
||||
ResultRelInfo *resultRelInfo);
|
||||
|
||||
extern void ExecInitMerge(ModifyTableState *mtstate,
|
||||
EState *estate,
|
||||
ResultRelInfo *resultRelInfo);
|
||||
|
||||
#endif /* NODEMERGE_H */
|
|
@ -1,22 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* nodeMerge.h
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* src/include/executor/nodeMerge.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef NODEMERGE_H
|
||||
#define NODEMERGE_H
|
||||
|
||||
#include "nodes/execnodes.h"
|
||||
|
||||
extern void
|
||||
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
|
||||
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo);
|
||||
|
||||
#endif /* NODEMERGE_H */
|
|
@ -39,5 +39,6 @@ extern TupleTableSlot *ExecInsert(ModifyTableState *mtstate,
|
|||
EState *estate,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag);
|
||||
extern void ExecCheckPlanOutput(Relation resultRel, List *targetList);
|
||||
|
||||
#endif /* NODEMODIFYTABLE_H */
|
||||
|
|
Loading…
Reference in New Issue