MERGE post-commit review

Review comments from Andres Freund

* Consolidate code into AfterTriggerGetTransitionTable()
* Rename nodeMerge.c to execMerge.c
* Rename nodeMerge.h to execMerge.h
* Move MERGE handling in ExecInitModifyTable()
  into a execMerge.c ExecInitMerge()
* Move mt_merge_subcommands flags into execMerge.h
* Rename opt_and_condition to opt_merge_when_and_condition
* Wordsmith various comments

Author: Pavan Deolasee
Reviewer: Simon Riggs
This commit is contained in:
Simon Riggs 2018-04-05 09:54:07 +01:00
parent 1fd8690668
commit 4b2d44031f
11 changed files with 386 additions and 327 deletions

View File

@ -96,6 +96,12 @@ static HeapTuple ExecCallTriggerFunc(TriggerData *trigdata,
FmgrInfo *finfo, FmgrInfo *finfo,
Instrumentation *instr, Instrumentation *instr,
MemoryContext per_tuple_context); MemoryContext per_tuple_context);
static Tuplestorestate *AfterTriggerGetTransitionTable(int event,
HeapTuple oldtup,
HeapTuple newtup,
TransitionCaptureState *transition_capture);
static void TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
TupleConversionMap *map);
static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, static void AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int event, bool row_trigger, int event, bool row_trigger,
HeapTuple oldtup, HeapTuple newtup, HeapTuple oldtup, HeapTuple newtup,
@ -3846,6 +3852,14 @@ struct AfterTriggersTableData
bool before_trig_done; /* did we already queue BS triggers? */ bool before_trig_done; /* did we already queue BS triggers? */
bool after_trig_done; /* did we already queue AS triggers? */ bool after_trig_done; /* did we already queue AS triggers? */
AfterTriggerEventList after_trig_events; /* if so, saved list pointer */ AfterTriggerEventList after_trig_events; /* if so, saved list pointer */
/*
* We maintain separate transaction tables for UPDATE/INSERT/DELETE since
* MERGE can run all three actions in a single statement. Note that UPDATE
* needs both old and new transition tables whereas INSERT needs only new
* and DELETE needs only old.
*/
/* "old" transition table for UPDATE, if any */ /* "old" transition table for UPDATE, if any */
Tuplestorestate *old_upd_tuplestore; Tuplestorestate *old_upd_tuplestore;
/* "new" transition table for UPDATE, if any */ /* "new" transition table for UPDATE, if any */
@ -5716,6 +5730,84 @@ AfterTriggerPendingOnRel(Oid relid)
return false; return false;
} }
/*
* Get the transition table for the given event and depending on whether we are
* processing the old or the new tuple.
*/
static Tuplestorestate *
AfterTriggerGetTransitionTable(int event,
HeapTuple oldtup,
HeapTuple newtup,
TransitionCaptureState *transition_capture)
{
Tuplestorestate *tuplestore = NULL;
bool delete_old_table = transition_capture->tcs_delete_old_table;
bool update_old_table = transition_capture->tcs_update_old_table;
bool update_new_table = transition_capture->tcs_update_new_table;
bool insert_new_table = transition_capture->tcs_insert_new_table;;
/*
* For INSERT events newtup should be non-NULL, for DELETE events
* oldtup should be non-NULL, whereas for UPDATE events normally both
* oldtup and newtup are non-NULL. But for UPDATE events fired for
* capturing transition tuples during UPDATE partition-key row
* movement, oldtup is NULL when the event is for a row being inserted,
* whereas newtup is NULL when the event is for a row being deleted.
*/
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table &&
oldtup == NULL));
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
newtup == NULL));
/*
* We're called either for the newtup or the oldtup, but not both at the
* same time.
*/
Assert((oldtup != NULL) ^ (newtup != NULL));
if (oldtup != NULL)
{
if (event == TRIGGER_EVENT_DELETE && delete_old_table)
tuplestore = transition_capture->tcs_private->old_del_tuplestore;
else if (event == TRIGGER_EVENT_UPDATE && update_old_table)
tuplestore = transition_capture->tcs_private->old_upd_tuplestore;
}
if (newtup != NULL)
{
if (event == TRIGGER_EVENT_INSERT && insert_new_table)
tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
else if (event == TRIGGER_EVENT_UPDATE && update_new_table)
tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
}
return tuplestore;
}
/*
* Add the given heap tuple to the given tuplestore, applying the conversion
* map if necessary.
*/
static void
TransitionTableAddTuple(HeapTuple heaptup, Tuplestorestate *tuplestore,
TupleConversionMap *map)
{
/*
* Nothing needs to be done if we don't have a tuplestore.
*/
if (tuplestore == NULL)
return;
if (map != NULL)
{
HeapTuple converted = do_convert_tuple(heaptup, map);
tuplestore_puttuple(tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(tuplestore, heaptup);
}
/* ---------- /* ----------
* AfterTriggerSaveEvent() * AfterTriggerSaveEvent()
@ -5777,95 +5869,37 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
{ {
HeapTuple original_insert_tuple = transition_capture->tcs_original_insert_tuple; HeapTuple original_insert_tuple = transition_capture->tcs_original_insert_tuple;
TupleConversionMap *map = transition_capture->tcs_map; TupleConversionMap *map = transition_capture->tcs_map;
bool delete_old_table = transition_capture->tcs_delete_old_table;
bool update_old_table = transition_capture->tcs_update_old_table;
bool update_new_table = transition_capture->tcs_update_new_table;
bool insert_new_table = transition_capture->tcs_insert_new_table;;
/* /*
* For INSERT events newtup should be non-NULL, for DELETE events * Capture the old tuple in the appropriate transition table based on
* oldtup should be non-NULL, whereas for UPDATE events normally both * the event.
* oldtup and newtup are non-NULL. But for UPDATE events fired for
* capturing transition tuples during UPDATE partition-key row
* movement, oldtup is NULL when the event is for a row being inserted,
* whereas newtup is NULL when the event is for a row being deleted.
*/ */
Assert(!(event == TRIGGER_EVENT_DELETE && delete_old_table && if (oldtup != NULL)
oldtup == NULL));
Assert(!(event == TRIGGER_EVENT_INSERT && insert_new_table &&
newtup == NULL));
if (oldtup != NULL &&
(event == TRIGGER_EVENT_DELETE && delete_old_table))
{ {
Tuplestorestate *old_tuplestore; Tuplestorestate *tuplestore =
AfterTriggerGetTransitionTable(event,
old_tuplestore = transition_capture->tcs_private->old_del_tuplestore; oldtup,
NULL,
if (map != NULL) transition_capture);
{ TransitionTableAddTuple(oldtup, tuplestore, map);
HeapTuple converted = do_convert_tuple(oldtup, map);
tuplestore_puttuple(old_tuplestore, converted);
pfree(converted);
} }
else
tuplestore_puttuple(old_tuplestore, oldtup); /*
} * Capture the new tuple in the appropriate transition table based on
if (oldtup != NULL && * the event.
(event == TRIGGER_EVENT_UPDATE && update_old_table)) */
if (newtup != NULL)
{ {
Tuplestorestate *old_tuplestore; Tuplestorestate *tuplestore =
AfterTriggerGetTransitionTable(event,
old_tuplestore = transition_capture->tcs_private->old_upd_tuplestore; NULL,
newtup,
if (map != NULL) transition_capture);
{
HeapTuple converted = do_convert_tuple(oldtup, map);
tuplestore_puttuple(old_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(old_tuplestore, oldtup);
}
if (newtup != NULL &&
(event == TRIGGER_EVENT_INSERT && insert_new_table))
{
Tuplestorestate *new_tuplestore;
new_tuplestore = transition_capture->tcs_private->new_ins_tuplestore;
if (original_insert_tuple != NULL) if (original_insert_tuple != NULL)
tuplestore_puttuple(new_tuplestore, original_insert_tuple); tuplestore_puttuple(tuplestore, original_insert_tuple);
else if (map != NULL)
{
HeapTuple converted = do_convert_tuple(newtup, map);
tuplestore_puttuple(new_tuplestore, converted);
pfree(converted);
}
else else
tuplestore_puttuple(new_tuplestore, newtup); TransitionTableAddTuple(newtup, tuplestore, map);
}
if (newtup != NULL &&
(event == TRIGGER_EVENT_UPDATE && update_new_table))
{
Tuplestorestate *new_tuplestore;
new_tuplestore = transition_capture->tcs_private->new_upd_tuplestore;
if (original_insert_tuple != NULL)
tuplestore_puttuple(new_tuplestore, original_insert_tuple);
else if (map != NULL)
{
HeapTuple converted = do_convert_tuple(newtup, map);
tuplestore_puttuple(new_tuplestore, converted);
pfree(converted);
}
else
tuplestore_puttuple(new_tuplestore, newtup);
} }
/* /*

View File

@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
execGrouping.o execIndexing.o execJunk.o \ execGrouping.o execIndexing.o execJunk.o \
execMain.o execParallel.o execPartition.o execProcnode.o \ execMain.o execMerge.o execParallel.o execPartition.o execProcnode.o \
execReplication.o execScan.o execSRF.o execTuples.o \ execReplication.o execScan.o execSRF.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \ nodeBitmapAnd.o nodeBitmapOr.o \
@ -22,7 +22,7 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
nodeCustom.o nodeFunctionscan.o nodeGather.o \ nodeCustom.o nodeFunctionscan.o nodeGather.o \
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
nodeLimit.o nodeLockRows.o nodeGatherMerge.o \ nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeMerge.o nodeModifyTable.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \ nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
nodeValuesscan.o \ nodeValuesscan.o \

View File

@ -39,13 +39,14 @@ ModifyTable node visits each of those rows and marks the row deleted.
MERGE runs one generic plan that returns candidate target rows. Each row MERGE runs one generic plan that returns candidate target rows. Each row
consists of a super-row that contains all the columns needed by any of the consists of a super-row that contains all the columns needed by any of the
individual actions, plus a CTID and a TABLEOID junk columns. The CTID column is individual actions, plus CTID and TABLEOID junk columns. The CTID column is
required to know if a matching target row was found or not and the TABLEOID required to know if a matching target row was found or not and the TABLEOID
column is needed to find the underlying target partition, in case when the column is needed to find the underlying target partition, in case when the
target table is a partition table. If the CTID column is set we attempt to target table is a partition table. When a matching target tuple is found, the
activate WHEN MATCHED actions, or if it is NULL then we will attempt to CTID column identifies the matching target tuple and we attempt to activate
activate WHEN NOT MATCHED actions. Once we know which action is activated we WHEN MATCHED actions. If a matching tuple is not found, then CTID column is
form the final result row and apply only those changes. NULL and we attempt to activate WHEN NOT MATCHED actions. Once we know which
action is activated we form the final result row and apply only those changes.
XXX a great deal more documentation needs to be written here... XXX a great deal more documentation needs to be written here...

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* nodeMerge.c * execMerge.c
* routines to handle Merge nodes relating to the MERGE command * routines to handle Merge nodes relating to the MERGE command
* *
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* src/backend/executor/nodeMerge.c * src/backend/executor/execMerge.c
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -22,7 +22,7 @@
#include "executor/execPartition.h" #include "executor/execPartition.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeModifyTable.h" #include "executor/nodeModifyTable.h"
#include "executor/nodeMerge.h" #include "executor/execMerge.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
@ -32,6 +32,110 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/tqual.h" #include "utils/tqual.h"
static void ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
TupleTableSlot *slot);
static bool ExecMergeMatched(ModifyTableState *mtstate, EState *estate,
TupleTableSlot *slot, JunkFilter *junkfilter,
ItemPointer tupleid);
/*
* Perform MERGE.
*/
void
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo)
{
ExprContext *econtext = mtstate->ps.ps_ExprContext;
ItemPointer tupleid;
ItemPointerData tuple_ctid;
bool matched = false;
char relkind;
Datum datum;
bool isNull;
relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
Assert(relkind == RELKIND_RELATION ||
relkind == RELKIND_PARTITIONED_TABLE);
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous cycle.
*/
ResetExprContext(econtext);
/*
* We run a JOIN between the target relation and the source relation to
* find a set of candidate source rows that has matching row in the target
* table and a set of candidate source rows that does not have matching
* row in the target table. If the join returns us a tuple with target
* relation's tid set, that implies that the join found a matching row for
* the given source tuple. This case triggers the WHEN MATCHED clause of
* the MERGE. Whereas a NULL in the target relation's ctid column
* indicates a NOT MATCHED case.
*/
datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);
if (!isNull)
{
matched = true;
tupleid = (ItemPointer) DatumGetPointer(datum);
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */
tupleid = &tuple_ctid;
}
else
{
matched = false;
tupleid = NULL; /* we don't need it for INSERT actions */
}
/*
* If we are dealing with a WHEN MATCHED case, we execute the first action
* for which the additional WHEN MATCHED AND quals pass. If an action
* without quals is found, that action is executed.
*
* Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
* given WHEN NOT MATCHED actions in sequence until one passes.
*
* Things get interesting in case of concurrent update/delete of the
* target tuple. Such concurrent update/delete is detected while we are
* executing a WHEN MATCHED action.
*
* A concurrent update can:
*
* 1. modify the target tuple so that it no longer satisfies the
* additional quals attached to the current WHEN MATCHED action OR
*
* In this case, we are still dealing with a WHEN MATCHED case, but
* we should recheck the list of WHEN MATCHED actions and choose the first
* one that satisfies the new target tuple.
*
* 2. modify the target tuple so that the join quals no longer pass and
* hence the source tuple no longer has a match.
*
* In the second case, the source tuple no longer matches the target tuple,
* so we now instead find a qualifying WHEN NOT MATCHED action to execute.
*
* A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
*
* ExecMergeMatched takes care of following the update chain and
* re-finding the qualifying WHEN MATCHED action, as long as the updated
* target tuple still satisfies the join quals i.e. it still remains a
* WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
* returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
* always make progress by following the update chain and we never switch
* from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
* livelock.
*/
if (matched)
matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid);
/*
* Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
* returned "false", indicating the previously MATCHED tuple is no longer a
* matching tuple.
*/
if (!matched)
ExecMergeNotMatched(mtstate, estate, slot);
}
/* /*
* Check and execute the first qualifying MATCHED action. The current target * Check and execute the first qualifying MATCHED action. The current target
@ -248,8 +352,9 @@ lmerge_matched:;
/* /*
* This state should never be reached since the underlying * This state should never be reached since the underlying
* JOIN runs with a MVCC snapshot and should only return * JOIN runs with a MVCC snapshot and EvalPlanQual runs
* rows visible to us. * with a dirty snapshot. So such a row should have never
* been returned for MERGE.
*/ */
elog(ERROR, "unexpected invisible tuple"); elog(ERROR, "unexpected invisible tuple");
break; break;
@ -392,10 +497,10 @@ ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
TupleTableSlot *myslot; TupleTableSlot *myslot;
/* /*
* We are dealing with NOT MATCHED tuple. Since for MERGE, partition tree * We are dealing with NOT MATCHED tuple. Since for MERGE, the partition
* is not expanded for the result relation, we continue to work with the * tree is not expanded for the result relation, we continue to work with
* currently active result relation, which should be of the root of the * the currently active result relation, which corresponds to the root
* partition tree. * of the partition tree.
*/ */
resultRelInfo = mtstate->resultRelInfo; resultRelInfo = mtstate->resultRelInfo;
@ -474,102 +579,105 @@ ExecMergeNotMatched(ModifyTableState *mtstate, EState *estate,
} }
} }
/*
* Perform MERGE.
*/
void void
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot, ExecInitMerge(ModifyTableState *mtstate, EState *estate,
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo) ResultRelInfo *resultRelInfo)
{ {
ExprContext *econtext = mtstate->ps.ps_ExprContext; ListCell *l;
ItemPointer tupleid; ExprContext *econtext;
ItemPointerData tuple_ctid; List *mergeMatchedActionStates = NIL;
bool matched = false; List *mergeNotMatchedActionStates = NIL;
char relkind; TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
Datum datum; ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
bool isNull;
relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind; if (node->mergeActionList == NIL)
Assert(relkind == RELKIND_RELATION || return;
relkind == RELKIND_PARTITIONED_TABLE);
mtstate->mt_merge_subcommands = 0;
if (mtstate->ps.ps_ExprContext == NULL)
ExecAssignExprContext(estate, &mtstate->ps);
econtext = mtstate->ps.ps_ExprContext;
/* initialize slot for the existing tuple */
Assert(mtstate->mt_existing == NULL);
mtstate->mt_existing =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/* initialize slot for merge actions */
Assert(mtstate->mt_mergeproj == NULL);
mtstate->mt_mergeproj =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/* /*
* Reset per-tuple memory context to free any expression evaluation * Create a MergeActionState for each action on the mergeActionList
* storage allocated in the previous cycle. * and add it to either a list of matched actions or not-matched
* actions.
*/ */
ResetExprContext(econtext); foreach(l, node->mergeActionList)
/*
* We run a JOIN between the target relation and the source relation to
* find a set of candidate source rows that has matching row in the target
* table and a set of candidate source rows that does not have matching
* row in the target table. If the join returns us a tuple with target
* relation's tid set, that implies that the join found a matching row for
* the given source tuple. This case triggers the WHEN MATCHED clause of
* the MERGE. Whereas a NULL in the target relation's ctid column
* indicates a NOT MATCHED case.
*/
datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull);
if (!isNull)
{ {
matched = true; MergeAction *action = (MergeAction *) lfirst(l);
tupleid = (ItemPointer) DatumGetPointer(datum); MergeActionState *action_state = makeNode(MergeActionState);
tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ TupleDesc tupDesc;
tupleid = &tuple_ctid;
} action_state->matched = action->matched;
action_state->commandType = action->commandType;
action_state->whenqual = ExecInitQual((List *) action->qual,
&mtstate->ps);
/* create target slot for this action's projection */
tupDesc = ExecTypeFromTL((List *) action->targetList,
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
action_state->tupDesc = tupDesc;
/* build action projection state */
action_state->proj =
ExecBuildProjectionInfo(action->targetList, econtext,
mtstate->mt_mergeproj, &mtstate->ps,
resultRelInfo->ri_RelationDesc->rd_att);
/*
* We create two lists - one for WHEN MATCHED actions and one
* for WHEN NOT MATCHED actions - and stick the
* MergeActionState into the appropriate list.
*/
if (action_state->matched)
mergeMatchedActionStates =
lappend(mergeMatchedActionStates, action_state);
else else
mergeNotMatchedActionStates =
lappend(mergeNotMatchedActionStates, action_state);
switch (action->commandType)
{ {
matched = false; case CMD_INSERT:
tupleid = NULL; /* we don't need it for INSERT actions */ ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_INSERT;
break;
case CMD_UPDATE:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
break;
case CMD_DELETE:
mtstate->mt_merge_subcommands |= MERGE_DELETE;
break;
case CMD_NOTHING:
break;
default:
elog(ERROR, "unknown operation");
break;
} }
/* resultRelInfo->ri_mergeState->matchedActionStates =
* If we are dealing with a WHEN MATCHED case, we execute the first action mergeMatchedActionStates;
* for which the additional WHEN MATCHED AND quals pass. If an action resultRelInfo->ri_mergeState->notMatchedActionStates =
* without quals is found, that action is executed. mergeNotMatchedActionStates;
* }
* Similarly, if we are dealing with WHEN NOT MATCHED case, we look at the
* given WHEN NOT MATCHED actions in sequence until one passes.
*
* Things get interesting in case of concurrent update/delete of the
* target tuple. Such concurrent update/delete is detected while we are
* executing a WHEN MATCHED action.
*
* A concurrent update can:
*
* 1. modify the target tuple so that it no longer satisfies the
* additional quals attached to the current WHEN MATCHED action OR
*
* In this case, we are still dealing with a WHEN MATCHED case, but
* we should recheck the list of WHEN MATCHED actions and choose the first
* one that satisfies the new target tuple.
*
* 2. modify the target tuple so that the join quals no longer pass and
* hence the source tuple no longer has a match.
*
* In the second case, the source tuple no longer matches the target tuple,
* so we now instead find a qualifying WHEN NOT MATCHED action to execute.
*
* A concurrent delete, changes a WHEN MATCHED case to WHEN NOT MATCHED.
*
* ExecMergeMatched takes care of following the update chain and
* re-finding the qualifying WHEN MATCHED action, as long as the updated
* target tuple still satisfies the join quals i.e. it still remains a
* WHEN MATCHED case. If the tuple gets deleted or the join quals fail, it
* returns and we try ExecMergeNotMatched. Given that ExecMergeMatched
* always make progress by following the update chain and we never switch
* from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a
* livelock.
*/
if (matched)
matched = ExecMergeMatched(mtstate, estate, slot, junkfilter, tupleid);
/*
* Either we were dealing with a NOT MATCHED tuple or ExecMergeNotMatched()
* returned "false", indicating the previously MATCHED tuple is no longer a
* matching tuple.
*/
if (!matched)
ExecMergeNotMatched(mtstate, estate, slot);
} }

View File

@ -313,6 +313,10 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
/* /*
* Given OID of the partition leaf, return the index of the leaf in the * Given OID of the partition leaf, return the index of the leaf in the
* partition hierarchy. * partition hierarchy.
*
* NB: This is an O(N) operation. Unfortunately, there are many other problem
* areas with more than a handful partitions, so we don't try to optimise this
* code right now.
*/ */
int int
ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid) ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
@ -325,7 +329,10 @@ ExecFindPartitionByOid(PartitionTupleRouting *proute, Oid partoid)
break; break;
} }
Assert(i < proute->num_partitions); if (i >= proute->num_partitions)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("no partition found for OID %u", partoid)));
return i; return i;
} }

View File

@ -42,7 +42,7 @@
#include "commands/trigger.h" #include "commands/trigger.h"
#include "executor/execPartition.h" #include "executor/execPartition.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeMerge.h" #include "executor/execMerge.h"
#include "executor/nodeModifyTable.h" #include "executor/nodeModifyTable.h"
#include "foreign/fdwapi.h" #include "foreign/fdwapi.h"
#include "miscadmin.h" #include "miscadmin.h"
@ -69,11 +69,6 @@ static void ExecSetupChildParentMapForSubplan(ModifyTableState *mtstate);
static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node, static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
int whichplan); int whichplan);
/* flags for mt_merge_subcommands */
#define MERGE_INSERT 0x01
#define MERGE_UPDATE 0x02
#define MERGE_DELETE 0x04
/* /*
* Verify that the tuples to be produced by INSERT or UPDATE match the * Verify that the tuples to be produced by INSERT or UPDATE match the
* target relation's rowtype * target relation's rowtype
@ -86,7 +81,7 @@ static TupleConversionMap *tupconv_map_for_subplan(ModifyTableState *node,
* The plan output is represented by its targetlist, because that makes * The plan output is represented by its targetlist, because that makes
* handling the dropped-column case easier. * handling the dropped-column case easier.
*/ */
static void void
ExecCheckPlanOutput(Relation resultRel, List *targetList) ExecCheckPlanOutput(Relation resultRel, List *targetList)
{ {
TupleDesc resultDesc = RelationGetDescr(resultRel); TupleDesc resultDesc = RelationGetDescr(resultRel);
@ -2660,104 +2655,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
} }
resultRelInfo = mtstate->resultRelInfo; resultRelInfo = mtstate->resultRelInfo;
if (mtstate->operation == CMD_MERGE)
if (node->mergeActionList) ExecInitMerge(mtstate, estate, resultRelInfo);
{
ListCell *l;
ExprContext *econtext;
List *mergeMatchedActionStates = NIL;
List *mergeNotMatchedActionStates = NIL;
TupleDesc relationDesc = resultRelInfo->ri_RelationDesc->rd_att;
mtstate->mt_merge_subcommands = 0;
if (mtstate->ps.ps_ExprContext == NULL)
ExecAssignExprContext(estate, &mtstate->ps);
econtext = mtstate->ps.ps_ExprContext;
/* initialize slot for the existing tuple */
Assert(mtstate->mt_existing == NULL);
mtstate->mt_existing =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/* initialize slot for merge actions */
Assert(mtstate->mt_mergeproj == NULL);
mtstate->mt_mergeproj =
ExecInitExtraTupleSlot(mtstate->ps.state,
mtstate->mt_partition_tuple_routing ?
NULL : relationDesc);
/*
* Create a MergeActionState for each action on the mergeActionList
* and add it to either a list of matched actions or not-matched
* actions.
*/
foreach(l, node->mergeActionList)
{
MergeAction *action = (MergeAction *) lfirst(l);
MergeActionState *action_state = makeNode(MergeActionState);
TupleDesc tupDesc;
action_state->matched = action->matched;
action_state->commandType = action->commandType;
action_state->whenqual = ExecInitQual((List *) action->qual,
&mtstate->ps);
/* create target slot for this action's projection */
tupDesc = ExecTypeFromTL((List *) action->targetList,
resultRelInfo->ri_RelationDesc->rd_rel->relhasoids);
action_state->tupDesc = tupDesc;
/* build action projection state */
action_state->proj =
ExecBuildProjectionInfo(action->targetList, econtext,
mtstate->mt_mergeproj, &mtstate->ps,
resultRelInfo->ri_RelationDesc->rd_att);
/*
* We create two lists - one for WHEN MATCHED actions and one
* for WHEN NOT MATCHED actions - and stick the
* MergeActionState into the appropriate list.
*/
if (action_state->matched)
mergeMatchedActionStates =
lappend(mergeMatchedActionStates, action_state);
else
mergeNotMatchedActionStates =
lappend(mergeNotMatchedActionStates, action_state);
switch (action->commandType)
{
case CMD_INSERT:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_INSERT;
break;
case CMD_UPDATE:
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
action->targetList);
mtstate->mt_merge_subcommands |= MERGE_UPDATE;
break;
case CMD_DELETE:
mtstate->mt_merge_subcommands |= MERGE_DELETE;
break;
case CMD_NOTHING:
break;
default:
elog(ERROR, "unknown operation");
break;
}
resultRelInfo->ri_mergeState->matchedActionStates =
mergeMatchedActionStates;
resultRelInfo->ri_mergeState->notMatchedActionStates =
mergeNotMatchedActionStates;
}
}
/* select first subplan */ /* select first subplan */
mtstate->mt_whichplan = 0; mtstate->mt_whichplan = 0;

View File

@ -852,14 +852,14 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
} }
/* /*
* The MERGE produces the target rows by performing a right * The MERGE statement produces the target rows by performing a
* join between the target relation and the source relation * right join between the target relation and the source
* (which could be a plain relation or a subquery). The INSERT * relation (which could be a plain relation or a subquery).
* and UPDATE actions of the MERGE requires access to the * The INSERT and UPDATE actions of the MERGE statement
* columns from the source relation. We arrange things so that * requires access to the columns from the source relation. We
* the source relation attributes are available as INNER_VAR * arrange things so that the source relation attributes are
* and the target relation attributes are available from the * available as INNER_VAR and the target relation attributes
* scan tuple. * are available from the scan tuple.
*/ */
if (splan->mergeActionList != NIL) if (splan->mergeActionList != NIL)
{ {

View File

@ -585,7 +585,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> hash_partbound partbound_datum_list range_datum_list %type <list> hash_partbound partbound_datum_list range_datum_list
%type <defelt> hash_partbound_elem %type <defelt> hash_partbound_elem
%type <node> merge_when_clause opt_and_condition %type <node> merge_when_clause opt_merge_when_and_condition
%type <list> merge_when_list %type <list> merge_when_list
%type <node> merge_update merge_delete merge_insert %type <node> merge_update merge_delete merge_insert
@ -11129,7 +11129,7 @@ merge_when_list:
; ;
merge_when_clause: merge_when_clause:
WHEN MATCHED opt_and_condition THEN merge_update WHEN MATCHED opt_merge_when_and_condition THEN merge_update
{ {
MergeAction *m = makeNode(MergeAction); MergeAction *m = makeNode(MergeAction);
@ -11140,7 +11140,7 @@ merge_when_clause:
$$ = (Node *)m; $$ = (Node *)m;
} }
| WHEN MATCHED opt_and_condition THEN merge_delete | WHEN MATCHED opt_merge_when_and_condition THEN merge_delete
{ {
MergeAction *m = makeNode(MergeAction); MergeAction *m = makeNode(MergeAction);
@ -11151,7 +11151,7 @@ merge_when_clause:
$$ = (Node *)m; $$ = (Node *)m;
} }
| WHEN NOT MATCHED opt_and_condition THEN merge_insert | WHEN NOT MATCHED opt_merge_when_and_condition THEN merge_insert
{ {
MergeAction *m = makeNode(MergeAction); MergeAction *m = makeNode(MergeAction);
@ -11162,7 +11162,7 @@ merge_when_clause:
$$ = (Node *)m; $$ = (Node *)m;
} }
| WHEN NOT MATCHED opt_and_condition THEN DO NOTHING | WHEN NOT MATCHED opt_merge_when_and_condition THEN DO NOTHING
{ {
MergeAction *m = makeNode(MergeAction); MergeAction *m = makeNode(MergeAction);
@ -11175,7 +11175,7 @@ merge_when_clause:
} }
; ;
opt_and_condition: opt_merge_when_and_condition:
AND a_expr { $$ = $2; } AND a_expr { $$ = $2; }
| { $$ = NULL; } | { $$ = NULL; }
; ;

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* execMerge.h
*
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/executor/execMerge.h
*
*-------------------------------------------------------------------------
*/
#ifndef EXECMERGE_H
#define EXECMERGE_H
#include "nodes/execnodes.h"
/* flags for mt_merge_subcommands */
#define MERGE_INSERT 0x01
#define MERGE_UPDATE 0x02
#define MERGE_DELETE 0x04
extern void ExecMerge(ModifyTableState *mtstate, EState *estate,
TupleTableSlot *slot, JunkFilter *junkfilter,
ResultRelInfo *resultRelInfo);
extern void ExecInitMerge(ModifyTableState *mtstate,
EState *estate,
ResultRelInfo *resultRelInfo);
#endif /* NODEMERGE_H */

View File

@ -1,22 +0,0 @@
/*-------------------------------------------------------------------------
*
* nodeMerge.h
*
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/executor/nodeMerge.h
*
*-------------------------------------------------------------------------
*/
#ifndef NODEMERGE_H
#define NODEMERGE_H
#include "nodes/execnodes.h"
extern void
ExecMerge(ModifyTableState *mtstate, EState *estate, TupleTableSlot *slot,
JunkFilter *junkfilter, ResultRelInfo *resultRelInfo);
#endif /* NODEMERGE_H */

View File

@ -39,5 +39,6 @@ extern TupleTableSlot *ExecInsert(ModifyTableState *mtstate,
EState *estate, EState *estate,
MergeActionState *actionState, MergeActionState *actionState,
bool canSetTag); bool canSetTag);
extern void ExecCheckPlanOutput(Relation resultRel, List *targetList);
#endif /* NODEMODIFYTABLE_H */ #endif /* NODEMODIFYTABLE_H */