Fix BEFORE ROW trigger handling in cross-partition MERGE update.
Fix a bug during MERGE if a cross-partition update is attempted on a
partitioned table with a BEFORE DELETE ROW trigger that returns NULL,
to prevent the update. This would cause an error to be thrown, or an
assert failure in an assert-enabled build.
This was an oversight in 9321c79c86
, which failed to properly
distinguish a DELETE prevented by a trigger from one prevented by a
concurrent update. Fix by having ExecDelete() return the TM_Result
status to ExecCrossPartitionUpdate(), so that it can distinguish the
two cases, and make ExecCrossPartitionUpdate() return the TM_Result
status to ExecUpdateAct(), so that it can return the correct status
from a concurrent update.
In addition, ensure that the command tag is correctly updated by
having ExecMergeMatched() pass canSetTag to ExecUpdateAct(), rather
than passing false, so that it updates the command tag if it does a
cross-partition update, making this code path in ExecMergeMatched()
consistent with ExecUpdate().
Per bug #18238 from Alexander Lakhin. Back-patch to v15, where MERGE
was introduced.
Dean Rasheed, reviewed by Richard Guo and Jian He.
Discussion: https://postgr.es/m/18238-2f2bdc7f720180b9%40postgresql.org
This commit is contained in:
parent
e557db106e
commit
a0ff37173d
|
@ -1425,6 +1425,7 @@ ExecDelete(ModifyTableContext *context,
|
|||
bool processReturning,
|
||||
bool changingPart,
|
||||
bool canSetTag,
|
||||
TM_Result *tmresult,
|
||||
bool *tupleDeleted,
|
||||
TupleTableSlot **epqreturnslot)
|
||||
{
|
||||
|
@ -1441,7 +1442,7 @@ ExecDelete(ModifyTableContext *context,
|
|||
* done if it says we are.
|
||||
*/
|
||||
if (!ExecDeletePrologue(context, resultRelInfo, tupleid, oldtuple,
|
||||
epqreturnslot, NULL))
|
||||
epqreturnslot, tmresult))
|
||||
return NULL;
|
||||
|
||||
/* INSTEAD OF ROW DELETE Triggers */
|
||||
|
@ -1496,6 +1497,9 @@ ExecDelete(ModifyTableContext *context,
|
|||
ldelete:
|
||||
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart);
|
||||
|
||||
if (tmresult)
|
||||
*tmresult = result;
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case TM_SelfModified:
|
||||
|
@ -1734,6 +1738,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
|
|||
TupleTableSlot *slot,
|
||||
bool canSetTag,
|
||||
UpdateContext *updateCxt,
|
||||
TM_Result *tmresult,
|
||||
TupleTableSlot **retry_slot,
|
||||
TupleTableSlot **inserted_tuple,
|
||||
ResultRelInfo **insert_destrel)
|
||||
|
@ -1797,7 +1802,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
|
|||
false, /* processReturning */
|
||||
true, /* changingPart */
|
||||
false, /* canSetTag */
|
||||
&tuple_deleted, &epqslot);
|
||||
tmresult, &tuple_deleted, &epqslot);
|
||||
|
||||
/*
|
||||
* For some reason if DELETE didn't happen (e.g. trigger prevented it, or
|
||||
|
@ -1829,7 +1834,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
|
|||
* action entirely).
|
||||
*/
|
||||
if (context->relaction != NULL)
|
||||
return false;
|
||||
return *tmresult == TM_Ok;
|
||||
else if (TupIsNull(epqslot))
|
||||
return true;
|
||||
else
|
||||
|
@ -2031,6 +2036,7 @@ lreplace:
|
|||
if (ExecCrossPartitionUpdate(context, resultRelInfo,
|
||||
tupleid, oldtuple, slot,
|
||||
canSetTag, updateCxt,
|
||||
&result,
|
||||
&retry_slot,
|
||||
&inserted_tuple,
|
||||
&insert_destrel))
|
||||
|
@ -2070,7 +2076,7 @@ lreplace:
|
|||
* here; instead let it handle that on its own rules.
|
||||
*/
|
||||
if (context->relaction != NULL)
|
||||
return TM_Updated;
|
||||
return result;
|
||||
|
||||
/*
|
||||
* ExecCrossPartitionUpdate installed an updated version of the new
|
||||
|
@ -2898,7 +2904,7 @@ lmerge_matched:
|
|||
break; /* concurrent update/delete */
|
||||
}
|
||||
result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
|
||||
newslot, false, &updateCxt);
|
||||
newslot, canSetTag, &updateCxt);
|
||||
|
||||
/*
|
||||
* As in ExecUpdate(), if ExecUpdateAct() reports that a
|
||||
|
@ -2910,8 +2916,6 @@ lmerge_matched:
|
|||
if (updateCxt.crossPartUpdate)
|
||||
{
|
||||
mtstate->mt_merge_updated += 1;
|
||||
if (canSetTag)
|
||||
(estate->es_processed)++;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3843,7 +3847,7 @@ ExecModifyTable(PlanState *pstate)
|
|||
|
||||
case CMD_DELETE:
|
||||
slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple,
|
||||
true, false, node->canSetTag, NULL, NULL);
|
||||
true, false, node->canSetTag, NULL, NULL, NULL);
|
||||
break;
|
||||
|
||||
case CMD_MERGE:
|
||||
|
|
|
@ -1662,6 +1662,10 @@ ALTER TABLE pa_target ATTACH PARTITION part4 DEFAULT;
|
|||
INSERT INTO pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2) AS id;
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
|
@ -1669,6 +1673,11 @@ MERGE INTO pa_target t
|
|||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
NOTICE: ROW_COUNT = 14
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
tid | balance | val
|
||||
-----+---------+--------------------------
|
||||
|
@ -1725,6 +1734,10 @@ SELECT * FROM pa_target ORDER BY tid;
|
|||
ROLLBACK;
|
||||
-- try updating the partition key column
|
||||
BEGIN;
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
|
@ -1732,6 +1745,11 @@ MERGE INTO pa_target t
|
|||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
NOTICE: ROW_COUNT = 14
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
tid | balance | val
|
||||
-----+---------+--------------------------
|
||||
|
@ -1751,6 +1769,79 @@ SELECT * FROM pa_target ORDER BY tid;
|
|||
14 | 140 | inserted by merge
|
||||
(14 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- as above, but blocked by BEFORE DELETE ROW trigger
|
||||
BEGIN;
|
||||
CREATE FUNCTION trig_fn() RETURNS trigger LANGUAGE plpgsql AS
|
||||
$$ BEGIN RETURN NULL; END; $$;
|
||||
CREATE TRIGGER del_trig BEFORE DELETE ON pa_target
|
||||
FOR EACH ROW EXECUTE PROCEDURE trig_fn();
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
NOTICE: ROW_COUNT = 10
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
tid | balance | val
|
||||
-----+---------+--------------------------
|
||||
1 | 100 | initial
|
||||
2 | 20 | inserted by merge
|
||||
3 | 300 | initial
|
||||
4 | 40 | inserted by merge
|
||||
6 | 550 | initial updated by merge
|
||||
6 | 60 | inserted by merge
|
||||
7 | 700 | initial
|
||||
8 | 80 | inserted by merge
|
||||
9 | 900 | initial
|
||||
10 | 100 | inserted by merge
|
||||
12 | 1210 | initial updated by merge
|
||||
12 | 120 | inserted by merge
|
||||
14 | 1430 | initial updated by merge
|
||||
14 | 140 | inserted by merge
|
||||
(14 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- as above, but blocked by BEFORE INSERT ROW trigger
|
||||
BEGIN;
|
||||
CREATE FUNCTION trig_fn() RETURNS trigger LANGUAGE plpgsql AS
|
||||
$$ BEGIN RETURN NULL; END; $$;
|
||||
CREATE TRIGGER ins_trig BEFORE INSERT ON pa_target
|
||||
FOR EACH ROW EXECUTE PROCEDURE trig_fn();
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
NOTICE: ROW_COUNT = 3
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
tid | balance | val
|
||||
-----+---------+--------------------------
|
||||
6 | 550 | initial updated by merge
|
||||
12 | 1210 | initial updated by merge
|
||||
14 | 1430 | initial updated by merge
|
||||
(3 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- test RLS enforcement
|
||||
BEGIN;
|
||||
|
|
|
@ -1060,6 +1060,10 @@ INSERT INTO pa_target SELECT id, id * 100, 'initial' FROM generate_series(1,14,2
|
|||
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
|
@ -1067,6 +1071,10 @@ MERGE INTO pa_target t
|
|||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
|
@ -1085,6 +1093,10 @@ ROLLBACK;
|
|||
|
||||
-- try updating the partition key column
|
||||
BEGIN;
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
|
@ -1092,6 +1104,58 @@ MERGE INTO pa_target t
|
|||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
-- as above, but blocked by BEFORE DELETE ROW trigger
|
||||
BEGIN;
|
||||
CREATE FUNCTION trig_fn() RETURNS trigger LANGUAGE plpgsql AS
|
||||
$$ BEGIN RETURN NULL; END; $$;
|
||||
CREATE TRIGGER del_trig BEFORE DELETE ON pa_target
|
||||
FOR EACH ROW EXECUTE PROCEDURE trig_fn();
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
-- as above, but blocked by BEFORE INSERT ROW trigger
|
||||
BEGIN;
|
||||
CREATE FUNCTION trig_fn() RETURNS trigger LANGUAGE plpgsql AS
|
||||
$$ BEGIN RETURN NULL; END; $$;
|
||||
CREATE TRIGGER ins_trig BEFORE INSERT ON pa_target
|
||||
FOR EACH ROW EXECUTE PROCEDURE trig_fn();
|
||||
DO $$
|
||||
DECLARE
|
||||
result integer;
|
||||
BEGIN
|
||||
MERGE INTO pa_target t
|
||||
USING pa_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge');
|
||||
GET DIAGNOSTICS result := ROW_COUNT;
|
||||
RAISE NOTICE 'ROW_COUNT = %', result;
|
||||
END;
|
||||
$$;
|
||||
SELECT * FROM pa_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
|
|
Loading…
Reference in New Issue