mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-10-07 01:37:21 +02:00
Fix TRUNCATE .. CASCADE on partitions
When running TRUNCATE CASCADE on a child of a partitioned table referenced by another partitioned table, the truncate was not applied to partitions of the referencing table; this could leave rows violating the constraint in the referencing partitioned table. Repair by walking the pg_constraint chain all the way up to the topmost referencing table. Note: any partitioned tables containing FKs that reference other partitioned tables should be checked for possible violating rows, if TRUNCATE has occurred in partitions of the referenced table. Reported-by: Christophe Courtois Author: Jehan-Guillaume de Rorthais Discussion: https://postgr.es/m/20200204183906.115f693e@firost
This commit is contained in:
parent
598b466e80
commit
ce054a8cd4
@ -124,6 +124,9 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [
|
||||
option can be used to automatically include all dependent tables —
|
||||
but be very careful when using this option, or else you might lose data you
|
||||
did not intend to!
|
||||
Note in particular that when the table to be truncated is a partition,
|
||||
siblings partitions are left untouched, but cascading occurs to all
|
||||
referencing tables and all their partitions with no distinction.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
|
@ -3399,9 +3399,16 @@ List *
|
||||
heap_truncate_find_FKs(List *relationIds)
|
||||
{
|
||||
List *result = NIL;
|
||||
List *oids = list_copy(relationIds);
|
||||
List *parent_cons;
|
||||
ListCell *cell;
|
||||
ScanKeyData key;
|
||||
Relation fkeyRel;
|
||||
SysScanDesc fkeyScan;
|
||||
HeapTuple tuple;
|
||||
bool restart;
|
||||
|
||||
oids = list_copy(relationIds);
|
||||
|
||||
/*
|
||||
* Must scan pg_constraint. Right now, it is a seqscan because there is
|
||||
@ -3409,6 +3416,10 @@ heap_truncate_find_FKs(List *relationIds)
|
||||
*/
|
||||
fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
|
||||
|
||||
restart:
|
||||
restart = false;
|
||||
parent_cons = NIL;
|
||||
|
||||
fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
|
||||
NULL, 0, NULL);
|
||||
|
||||
@ -3421,16 +3432,85 @@ heap_truncate_find_FKs(List *relationIds)
|
||||
continue;
|
||||
|
||||
/* Not referencing one of our list of tables */
|
||||
if (!list_member_oid(relationIds, con->confrelid))
|
||||
if (!list_member_oid(oids, con->confrelid))
|
||||
continue;
|
||||
|
||||
/* Add referencer unless already in input or result list */
|
||||
/*
|
||||
* If this constraint has a parent constraint which we have not seen
|
||||
* yet, keep track of it for the second loop, below. Tracking parent
|
||||
* constraints allows us to climb up to the top-level level constraint
|
||||
* and look for all possible relations referencing the partitioned
|
||||
* table.
|
||||
*/
|
||||
if (OidIsValid(con->conparentid) &&
|
||||
!list_member_oid(parent_cons, con->conparentid))
|
||||
parent_cons = lappend_oid(parent_cons, con->conparentid);
|
||||
|
||||
/*
|
||||
* Add referencer to result, unless already present in input or result
|
||||
* list.
|
||||
*/
|
||||
if (!list_member_oid(relationIds, con->conrelid))
|
||||
result = insert_ordered_unique_oid(result, con->conrelid);
|
||||
}
|
||||
|
||||
systable_endscan(fkeyScan);
|
||||
|
||||
/*
|
||||
* Process each parent constraint we found to add the list of referenced
|
||||
* relations by them to the oids list. If we do add any new such
|
||||
* relations, redo the first loop above. Also, if we see that the parent
|
||||
* constraint in turn has a parent, add that so that we process all
|
||||
* relations in a single additional pass.
|
||||
*/
|
||||
foreach(cell, parent_cons)
|
||||
{
|
||||
Oid parent = lfirst_oid(cell);
|
||||
|
||||
ScanKeyInit(&key,
|
||||
Anum_pg_constraint_oid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(parent));
|
||||
|
||||
fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
|
||||
true, NULL, 1, &key);
|
||||
|
||||
tuple = systable_getnext(fkeyScan);
|
||||
if (HeapTupleIsValid(tuple))
|
||||
{
|
||||
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
|
||||
|
||||
/*
|
||||
* pg_constraint rows always appear for partitioned hierarchies
|
||||
* this way: on the each side of the constraint, one row appears
|
||||
* for each partition that points to the top-most table on the
|
||||
* other side.
|
||||
*
|
||||
* Because of this arrangement, we can correctly catch all
|
||||
* relevant relations by adding to 'parent_cons' all rows with
|
||||
* valid conparentid, and to the 'oids' list all rows with a
|
||||
* zero conparentid. If any oids are added to 'oids', redo the
|
||||
* first loop above by setting 'restart'.
|
||||
*/
|
||||
if (OidIsValid(con->conparentid))
|
||||
parent_cons = list_append_unique_oid(parent_cons,
|
||||
con->conparentid);
|
||||
else if (!list_member_oid(oids, con->confrelid))
|
||||
{
|
||||
oids = lappend_oid(oids, con->confrelid);
|
||||
restart = true;
|
||||
}
|
||||
}
|
||||
|
||||
systable_endscan(fkeyScan);
|
||||
}
|
||||
|
||||
list_free(parent_cons);
|
||||
if (restart)
|
||||
goto restart;
|
||||
|
||||
table_close(fkeyRel, AccessShareLock);
|
||||
list_free(oids);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -542,3 +542,53 @@ SELECT * FROM tp_chk_data();
|
||||
|
||||
DROP TABLE truncprim, truncpart;
|
||||
DROP FUNCTION tp_ins_data(), tp_chk_data();
|
||||
-- test cascade when referencing a partitioned table
|
||||
CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
|
||||
CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
|
||||
CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
|
||||
PARTITION BY RANGE (a);
|
||||
CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
|
||||
CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
|
||||
CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
|
||||
CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
|
||||
INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
|
||||
-- truncate a partition cascading to a table
|
||||
CREATE TABLE ref_b (
|
||||
b INT PRIMARY KEY,
|
||||
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
|
||||
);
|
||||
INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
|
||||
TRUNCATE TABLE trunc_a1 CASCADE;
|
||||
NOTICE: truncate cascades to table "ref_b"
|
||||
SELECT a FROM ref_b;
|
||||
a
|
||||
---
|
||||
(0 rows)
|
||||
|
||||
DROP TABLE ref_b;
|
||||
-- truncate a partition cascading to a partitioned table
|
||||
CREATE TABLE ref_c (
|
||||
c INT PRIMARY KEY,
|
||||
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
|
||||
) PARTITION BY RANGE (c);
|
||||
CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
|
||||
INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
|
||||
TRUNCATE TABLE trunc_a21 CASCADE;
|
||||
NOTICE: truncate cascades to table "ref_c"
|
||||
NOTICE: truncate cascades to table "ref_c1"
|
||||
NOTICE: truncate cascades to table "ref_c2"
|
||||
SELECT a as "from table ref_c" FROM ref_c;
|
||||
from table ref_c
|
||||
------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
|
||||
from table trunc_a
|
||||
--------------------
|
||||
15
|
||||
20
|
||||
25
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE trunc_a, ref_c;
|
||||
|
@ -289,3 +289,41 @@ TRUNCATE TABLE truncpart;
|
||||
SELECT * FROM tp_chk_data();
|
||||
DROP TABLE truncprim, truncpart;
|
||||
DROP FUNCTION tp_ins_data(), tp_chk_data();
|
||||
|
||||
-- test cascade when referencing a partitioned table
|
||||
CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
|
||||
CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
|
||||
CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
|
||||
PARTITION BY RANGE (a);
|
||||
CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
|
||||
CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
|
||||
CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
|
||||
CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
|
||||
INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
|
||||
|
||||
-- truncate a partition cascading to a table
|
||||
CREATE TABLE ref_b (
|
||||
b INT PRIMARY KEY,
|
||||
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
|
||||
);
|
||||
INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
|
||||
|
||||
TRUNCATE TABLE trunc_a1 CASCADE;
|
||||
SELECT a FROM ref_b;
|
||||
|
||||
DROP TABLE ref_b;
|
||||
|
||||
-- truncate a partition cascading to a partitioned table
|
||||
CREATE TABLE ref_c (
|
||||
c INT PRIMARY KEY,
|
||||
a INT REFERENCES trunc_a(a) ON DELETE CASCADE
|
||||
) PARTITION BY RANGE (c);
|
||||
CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
|
||||
INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
|
||||
|
||||
TRUNCATE TABLE trunc_a21 CASCADE;
|
||||
SELECT a as "from table ref_c" FROM ref_c;
|
||||
SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
|
||||
|
||||
DROP TABLE trunc_a, ref_c;
|
||||
|
Loading…
Reference in New Issue
Block a user