Foreign keys on partitioned tables

Author: Álvaro Herrera
Discussion: https://postgr.es/m/20171231194359.cvojcour423ulha4@alvherre.pgsql
Reviewed-by: Peter Eisentraut
This commit is contained in:
Alvaro Herrera 2018-04-04 14:02:31 -03:00
parent 857f9c36cd
commit 3de241dba8
17 changed files with 895 additions and 109 deletions

View File

@ -368,7 +368,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
specified check constraints). But the
database will not assume that the constraint holds for all rows in
the table, until it is validated by using the <literal>VALIDATE
CONSTRAINT</literal> option.
CONSTRAINT</literal> option. Foreign key constraints on partitioned
tables may not be declared <literal>NOT VALID</literal> at present.
</para>
<para>

View File

@ -546,9 +546,12 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
</para>
<para>
Partitioned tables do not support <literal>EXCLUDE</literal> or
<literal>FOREIGN KEY</literal> constraints; however, you can define
these constraints on individual partitions.
Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
however, you can define these constraints on individual partitions.
Also, while it's possible to define <literal>PRIMARY KEY</literal>
constraints on partitioned tables, it is not supported to create foreign
keys cannot that reference them. This restriction will be lifted in a
future release.
</para>
</listitem>
@ -907,7 +910,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
must have <literal>REFERENCES</literal> permission on the referenced table
(either the whole table, or the specific referenced columns).
Note that foreign key constraints cannot be defined between temporary
tables and permanent tables.
tables and permanent tables. Also note that while it is possible to
define a foreign key on a partitioned table, it is not possible to
declare a foreign key that references a partitioned table.
</para>
<para>

View File

@ -26,6 +26,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
return conOid;
}
/*
* CloneForeignKeyConstraints
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
* relationId is a partition of parentId, so we can be certain that it has the
* same columns with the same datatypes. The columns may be in different
* order, though.
*
* The *cloned list is appended ClonedConstraint elements describing what was
* created.
*/
void
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
{
Relation pg_constraint;
Relation parentRel;
Relation rel;
ScanKeyData key;
SysScanDesc scan;
TupleDesc tupdesc;
HeapTuple tuple;
AttrNumber *attmap;
parentRel = heap_open(parentId, NoLock); /* already got lock */
/* see ATAddForeignKeyConstraint about lock level */
rel = heap_open(relationId, AccessExclusiveLock);
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
tupdesc = RelationGetDescr(pg_constraint);
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
*/
attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
RelationGetDescr(parentRel),
gettext_noop("could not convert row type"));
ScanKeyInit(&key,
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(parentId));
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
NULL, 1, &key);
while ((tuple = systable_getnext(scan)) != NULL)
{
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
AttrNumber conkey[INDEX_MAX_KEYS];
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
AttrNumber confkey[INDEX_MAX_KEYS];
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
ClonedConstraint *newc;
Oid constrOid;
ObjectAddress parentAddr,
childAddr;
int nelem;
int i;
ArrayType *arr;
Datum datum;
bool isnull;
/* only foreign keys */
if (constrForm->contype != CONSTRAINT_FOREIGN)
continue;
ObjectAddressSet(parentAddr, ConstraintRelationId,
HeapTupleGetOid(tuple));
datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conkey");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != INT2OID)
elog(ERROR, "conkey is not a 1-D smallint array");
memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
for (i = 0; i < nelem; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null confkey");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != INT2OID)
elog(ERROR, "confkey is not a 1-D smallint array");
memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conpfeqop");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conpfeqop is not a 1-D OID array");
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conpfeqop");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conpfeqop is not a 1-D OID array");
memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conppeqop");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conppeqop is not a 1-D OID array");
memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conffeqop");
arr = DatumGetArrayTypeP(datum);
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
nelem < 1 ||
nelem > INDEX_MAX_KEYS ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conffeqop is not a 1-D OID array");
memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
constrOid =
CreateConstraintEntry(NameStr(constrForm->conname),
constrForm->connamespace,
CONSTRAINT_FOREIGN,
constrForm->condeferrable,
constrForm->condeferred,
constrForm->convalidated,
HeapTupleGetOid(tuple),
relationId,
mapped_conkey,
nelem,
InvalidOid, /* not a domain constraint */
constrForm->conindid, /* same index */
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
nelem,
constrForm->confupdtype,
constrForm->confdeltype,
constrForm->confmatchtype,
NULL,
NULL,
NULL,
NULL,
false,
1, false, true);
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
constrOid, constrForm->conindid, false);
if (cloned)
{
/*
* Feed back caller about the constraints we created, so that they can
* set up constraint verification.
*/
newc = palloc(sizeof(ClonedConstraint));
newc->relid = relationId;
newc->refrelid = constrForm->confrelid;
newc->conindid = constrForm->conindid;
newc->conid = constrOid;
newc->constraint = fkconstraint;
*cloned = lappend(*cloned, newc);
}
}
systable_endscan(scan);
pfree(attmap);
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc partdesc = RelationGetPartitionDesc(rel);
int i;
for (i = 0; i < partdesc->nparts; i++)
CloneForeignKeyConstraints(RelationGetRelid(rel),
partdesc->oids[i],
cloned);
}
heap_close(rel, NoLock); /* keep lock till commit */
heap_close(parentRel, NoLock);
heap_close(pg_constraint, RowShareLock);
}
/*
* Test whether given name is currently used as a constraint name

View File

@ -338,9 +338,6 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, Oid refRelOid,
Constraint *fkconstraint,
Oid constraintOid, Oid indexOid);
static void ATController(AlterTableStmt *parsetree,
Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
@ -411,8 +408,10 @@ static ObjectAddress ATAddCheckConstraint(List **wqueue,
Constraint *constr,
bool recurse, bool recursing, bool is_readd,
LOCKMODE lockmode);
static ObjectAddress ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
Constraint *fkconstraint, LOCKMODE lockmode);
static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab,
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
@ -505,6 +504,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
* relkind: relkind to assign to the new relation
* ownerId: if not InvalidOid, use this as the new relation's owner.
* typaddress: if not null, it's set to the pg_type entry's address.
* queryString: for error reporting
*
* Note that permissions checks are done against current user regardless of
* ownerId. A nonzero ownerId is used when someone is creating a relation
@ -908,8 +908,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
}
/*
* If we're creating a partition, create now all the indexes and triggers
* defined in the parent.
* If we're creating a partition, create now all the indexes, triggers,
* FKs defined in the parent.
*
* We can't do it earlier, because DefineIndex wants to know the partition
* key which we just stored.
@ -961,6 +961,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
if (parent->trigdesc != NULL)
CloneRowTriggersToPartition(parent, rel);
/*
* And foreign keys too. Note that because we're freshly creating the
* table, there is no need to verify these new constraints.
*/
CloneForeignKeyConstraints(parentId, relationId, NULL);
heap_close(parent, NoLock);
}
@ -7025,7 +7031,9 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
RelationGetNamespace(rel),
NIL);
address = ATAddForeignKeyConstraint(tab, rel, newConstraint,
address = ATAddForeignKeyConstraint(wqueue, tab, rel,
newConstraint, InvalidOid,
recurse, false,
lockmode);
break;
@ -7180,8 +7188,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* We do permissions checks here, however.
*/
static ObjectAddress
ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
Constraint *fkconstraint, LOCKMODE lockmode)
ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing, LOCKMODE lockmode)
{
Relation pkrel;
int16 pkattnum[INDEX_MAX_KEYS];
@ -7220,6 +7229,21 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
errmsg("cannot reference partitioned table \"%s\"",
RelationGetRelationName(pkrel))));
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (!recurse)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("foreign key referencing partitioned table \"%s\" must not be ONLY",
RelationGetRelationName(pkrel))));
if (fkconstraint->skip_validation && !fkconstraint->initially_valid)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot add NOT VALID foreign key to relation \"%s\"",
RelationGetRelationName(pkrel)),
errdetail("This feature is not yet supported on partitioned tables.")));
}
if (pkrel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@ -7527,7 +7551,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
fkconstraint->deferrable,
fkconstraint->initdeferred,
fkconstraint->initially_valid,
InvalidOid, /* no parent constraint */
parentConstr,
RelationGetRelid(rel),
fkattnum,
numfks,
@ -7553,10 +7577,12 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
ObjectAddressSet(address, ConstraintRelationId, constrOid);
/*
* Create the triggers that will enforce the constraint.
* Create the triggers that will enforce the constraint. We only want
* the action triggers to appear for the parent partitioned relation,
* even though the constraints also exist below.
*/
createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
constrOid, indexOid);
constrOid, indexOid, !recursing);
/*
* Tell Phase 3 to check that the constraint is satisfied by existing
@ -7580,6 +7606,40 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
tab->constraints = lappend(tab->constraints, newcon);
}
/*
* When called on a partitioned table, recurse to create the constraint on
* the partitions also.
*/
if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc partdesc;
partdesc = RelationGetPartitionDesc(rel);
for (i = 0; i < partdesc->nparts; i++)
{
Oid partitionId = partdesc->oids[i];
Relation partition = heap_open(partitionId, lockmode);
AlteredTableInfo *childtab;
ObjectAddress childAddr;
CheckTableNotInUse(partition, "ALTER TABLE");
/* Find or create work queue entry for this table */
childtab = ATGetQueueEntry(wqueue, partition);
childAddr =
ATAddForeignKeyConstraint(wqueue, childtab, partition,
fkconstraint, constrOid,
recurse, true, lockmode);
/* Record this constraint as dependent on the parent one */
recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO);
heap_close(partition, NoLock);
}
}
/*
* Close pk table, but keep lock until we've committed.
*/
@ -7842,8 +7902,8 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
heap_close(refrel, NoLock);
/*
* Foreign keys do not inherit, so we purposely ignore the
* recursion bit here
* We disallow creating invalid foreign keys to or from
* partitioned tables, so ignoring the recursion bit is okay.
*/
}
else if (con->contype == CONSTRAINT_CHECK)
@ -8489,23 +8549,16 @@ CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint,
}
/*
* Create the triggers that implement an FK constraint.
*
* NB: if you change any trigger properties here, see also
* ATExecAlterConstraint.
* createForeignKeyActionTriggers
* Create the referenced-side "action" triggers that implement a foreign
* key.
*/
static void
createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid)
createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid)
{
Oid myRelOid;
CreateTrigStmt *fk_trigger;
myRelOid = RelationGetRelid(rel);
/* Make changes-so-far visible */
CommandCounterIncrement();
/*
* Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
* DELETE action on the referenced table.
@ -8555,7 +8608,8 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
(void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
(void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
constraintOid,
indexOid, InvalidOid, InvalidOid, NULL, true, false);
/* Make changes-so-far visible */
@ -8610,22 +8664,58 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
(void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
(void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
constraintOid,
indexOid, InvalidOid, InvalidOid, NULL, true, false);
}
/* Make changes-so-far visible */
CommandCounterIncrement();
/*
* Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK
* action for both INSERTs and UPDATEs on the referencing table.
*/
/*
* createForeignKeyCheckTriggers
* Create the referencing-side "check" triggers that implement a foreign
* key.
*/
static void
createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
Constraint *fkconstraint, Oid constraintOid,
Oid indexOid)
{
CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
indexOid, true);
CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
indexOid, false);
}
/*
* Create the triggers that implement an FK constraint.
*
* NB: if you change any trigger properties here, see also
* ATExecAlterConstraint.
*/
void
createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid, bool create_action)
{
/*
* For the referenced side, create action triggers, if requested. (If the
* referencing side is partitioned, there is still only one trigger, which
* runs on the referenced side and points to the top of the referencing
* hierarchy.)
*/
if (create_action)
createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
indexOid);
/*
* For the referencing side, create the check triggers. We only need these
* on the partitions.
*/
if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
fkconstraint, constraintOid, indexOid);
CommandCounterIncrement();
}
/*
* ALTER TABLE DROP CONSTRAINT
*
@ -13889,6 +13979,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
Oid defaultPartOid;
List *partBoundConstraint;
List *cloned;
ListCell *l;
/*
* We must lock the default partition, because attaching a new partition
@ -14071,6 +14163,35 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
/* and triggers */
CloneRowTriggersToPartition(rel, attachrel);
/*
* Clone foreign key constraints, and setup for Phase 3 to verify them.
*/
cloned = NIL;
CloneForeignKeyConstraints(RelationGetRelid(rel),
RelationGetRelid(attachrel), &cloned);
foreach(l, cloned)
{
ClonedConstraint *cloned = lfirst(l);
NewConstraint *newcon;
Relation clonedrel;
AlteredTableInfo *parttab;
clonedrel = relation_open(cloned->relid, NoLock);
parttab = ATGetQueueEntry(wqueue, clonedrel);
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
newcon->name = cloned->constraint->conname;
newcon->contype = CONSTR_FOREIGN;
newcon->refrelid = cloned->refrelid;
newcon->refindid = cloned->conindid;
newcon->conid = cloned->conid;
newcon->qual = (Node *) cloned->constraint;
parttab->constraints = lappend(parttab->constraints, newcon);
relation_close(clonedrel, NoLock);
}
/*
* Generate partition constraint from the partition bound specification.
* If the parent itself is a partition, make sure to include its

View File

@ -749,12 +749,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
errmsg("foreign key constraints are not supported on foreign tables"),
parser_errposition(cxt->pstate,
constraint->location)));
if (cxt->ispartitioned)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign key constraints are not supported on partitioned tables"),
parser_errposition(cxt->pstate,
constraint->location)));
/*
* Fill in the current attribute's name and throw it into the
@ -868,12 +862,6 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
errmsg("foreign key constraints are not supported on foreign tables"),
parser_errposition(cxt->pstate,
constraint->location)));
if (cxt->ispartitioned)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign key constraints are not supported on partitioned tables"),
parser_errposition(cxt->pstate,
constraint->location)));
cxt->fkconstraints = lappend(cxt->fkconstraints, constraint);
break;

View File

@ -788,20 +788,23 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
const char *fk_only;
int i;
/* ----------
* The query string built is
* SELECT 1 FROM ONLY <fktable> x WHERE $1 = fkatt1 [AND ...]
* SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
* ----------
*/
initStringInfo(&querybuf);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x",
fkrelname);
appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
fk_only, fkrelname);
querysep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
{
@ -947,17 +950,21 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
const char *fk_only;
/* ----------
* The query string built is
* DELETE FROM ONLY <fktable> WHERE $1 = fkatt1 [AND ...]
* DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
* ----------
*/
initStringInfo(&querybuf);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
appendStringInfo(&querybuf, "DELETE FROM ONLY %s", fkrelname);
appendStringInfo(&querybuf, "DELETE FROM %s%s",
fk_only, fkrelname);
querysep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
{
@ -1118,10 +1125,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
const char *querysep;
const char *qualsep;
Oid queryoids[RI_MAX_NUMKEYS * 2];
const char *fk_only;
/* ----------
* The query string built is
* UPDATE ONLY <fktable> SET fkatt1 = $1 [, ...]
* UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
* WHERE $n = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes. Note that we are assuming
@ -1131,8 +1139,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
*/
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
appendStringInfo(&querybuf, "UPDATE %s%s SET",
fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
@ -1337,11 +1348,12 @@ ri_setnull(TriggerData *trigdata)
char paramname[16];
const char *querysep;
const char *qualsep;
const char *fk_only;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
* UPDATE ONLY <fktable> SET fkatt1 = NULL [, ...]
* UPDATE [ONLY] <fktable> SET fkatt1 = NULL [, ...]
* WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
@ -1349,8 +1361,11 @@ ri_setnull(TriggerData *trigdata)
*/
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
appendStringInfo(&querybuf, "UPDATE %s%s SET",
fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
@ -1554,11 +1569,12 @@ ri_setdefault(TriggerData *trigdata)
const char *querysep;
const char *qualsep;
Oid queryoids[RI_MAX_NUMKEYS];
const char *fk_only;
int i;
/* ----------
* The query string built is
* UPDATE ONLY <fktable> SET fkatt1 = DEFAULT [, ...]
* UPDATE [ONLY] <fktable> SET fkatt1 = DEFAULT [, ...]
* WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
@ -1567,7 +1583,10 @@ ri_setdefault(TriggerData *trigdata)
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
quoteRelationName(fkrelname, fk_rel);
appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
appendStringInfo(&querybuf, "UPDATE %s%s SET",
fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
@ -1838,6 +1857,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
RangeTblEntry *pkrte;
RangeTblEntry *fkrte;
const char *sep;
const char *fk_only;
int i;
int save_nestlevel;
char workmembuf[32];
@ -1894,8 +1914,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
/*----------
* The query string built is:
* SELECT fk.keycols FROM ONLY relname fk
* LEFT OUTER JOIN ONLY pkrelname pk
* SELECT fk.keycols FROM [ONLY] relname fk
* LEFT OUTER JOIN pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH SIMPLE:
@ -1920,9 +1940,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
quoteRelationName(pkrelname, pk_rel);
quoteRelationName(fkrelname, fk_rel);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
appendStringInfo(&querybuf,
" FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON",
fkrelname, pkrelname);
" FROM %s%s fk LEFT OUTER JOIN %s pk ON",
fk_only, fkrelname, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
@ -2298,13 +2320,6 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
trigger->tgname, RelationGetRelationName(trig_rel));
}
else
{
if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
riinfo->pk_relid != trigger->tgconstrrelid)
elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
trigger->tgname, RelationGetRelationName(trig_rel));
}
return riinfo;
}

View File

@ -7116,13 +7116,23 @@ getConstraints(Archive *fout, TableInfo tblinfo[], int numTables)
tbinfo->dobj.name);
resetPQExpBuffer(query);
appendPQExpBuffer(query,
"SELECT tableoid, oid, conname, confrelid, "
"pg_catalog.pg_get_constraintdef(oid) AS condef "
"FROM pg_catalog.pg_constraint "
"WHERE conrelid = '%u'::pg_catalog.oid "
"AND contype = 'f'",
tbinfo->dobj.catId.oid);
if (fout->remoteVersion >= 110000)
appendPQExpBuffer(query,
"SELECT tableoid, oid, conname, confrelid, "
"pg_catalog.pg_get_constraintdef(oid) AS condef "
"FROM pg_catalog.pg_constraint "
"WHERE conrelid = '%u'::pg_catalog.oid "
"AND conparentid = 0 "
"AND contype = 'f'",
tbinfo->dobj.catId.oid);
else
appendPQExpBuffer(query,
"SELECT tableoid, oid, conname, confrelid, "
"pg_catalog.pg_get_constraintdef(oid) AS condef "
"FROM pg_catalog.pg_constraint "
"WHERE conrelid = '%u'::pg_catalog.oid "
"AND contype = 'f'",
tbinfo->dobj.catId.oid);
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
@ -16374,18 +16384,28 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
}
else if (coninfo->contype == 'f')
{
char *only;
/*
* Foreign keys on partitioned tables are always declared as inheriting
* to partitions; for all other cases, emit them as applying ONLY
* directly to the named table, because that's how they work for
* regular inherited tables.
*/
only = tbinfo->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY ";
/*
* XXX Potentially wrap in a 'SET CONSTRAINTS OFF' block so that the
* current table data is not processed
*/
appendPQExpBuffer(q, "ALTER TABLE ONLY %s\n",
fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(q, "ALTER TABLE %s%s\n",
only, fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(q, " ADD CONSTRAINT %s %s;\n",
fmtId(coninfo->dobj.name),
coninfo->condef);
appendPQExpBuffer(delq, "ALTER TABLE ONLY %s ",
fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(delq, "ALTER TABLE %s%s ",
only, fmtQualifiedDumpable(tbinfo));
appendPQExpBuffer(delq, "DROP CONSTRAINT %s;\n",
fmtId(coninfo->dobj.name));

View File

@ -27,6 +27,19 @@ typedef enum ConstraintCategory
CONSTRAINT_ASSERTION /* for future expansion */
} ConstraintCategory;
/*
* Used when cloning a foreign key constraint to a partition, so that the
* caller can optionally set up a verification pass for it.
*/
typedef struct ClonedConstraint
{
Oid relid;
Oid refrelid;
Oid conindid;
Oid conid;
Constraint *constraint;
} ClonedConstraint;
extern Oid CreateConstraintEntry(const char *constraintName,
Oid constraintNamespace,
char constraintType,
@ -57,6 +70,9 @@ extern Oid CreateConstraintEntry(const char *constraintName,
bool conNoInherit,
bool is_internal);
extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
List **cloned);
extern void RemoveConstraintById(Oid conId);
extern void RenameConstraintById(Oid conId, const char *newname);

View File

@ -74,6 +74,10 @@ extern void find_composite_type_dependencies(Oid typeOid,
extern void check_of_type(HeapTuple typetuple);
extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
Constraint *fkconstraint, Oid constraintOid,
Oid indexOid, bool create_action);
extern void register_on_commit_action(Oid relid, OnCommitAction action);
extern void remove_on_commit_action(Oid relid);

View File

@ -3305,10 +3305,6 @@ CREATE TABLE partitioned (
a int,
b int
) PARTITION BY RANGE (a, (a+b+1));
ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
ERROR: foreign key constraints are not supported on partitioned tables
LINE 1: ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
^
ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);
ERROR: exclusion constraints are not supported on partitioned tables
LINE 1: ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);

View File

@ -281,16 +281,6 @@ CREATE TABLE partitioned (
) PARTITION BY LIST (a1, a2); -- fail
ERROR: cannot use "list" partition strategy with more than one column
-- unsupported constraint type for partitioned tables
CREATE TABLE pkrel (
a int PRIMARY KEY
);
CREATE TABLE partitioned (
a int REFERENCES pkrel(a)
) PARTITION BY RANGE (a);
ERROR: foreign key constraints are not supported on partitioned tables
LINE 2: a int REFERENCES pkrel(a)
^
DROP TABLE pkrel;
CREATE TABLE partitioned (
a int,
EXCLUDE USING gist (a WITH &&)

View File

@ -1428,3 +1428,214 @@ alter table fktable2 drop constraint fktable2_f1_fkey;
ERROR: cannot ALTER TABLE "pktable2" because it has pending trigger events
commit;
drop table pktable2, fktable2;
--
-- Foreign keys and partitioned tables
--
-- partitioned table in the referenced side are not allowed
CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
PARTITION BY RANGE (a, b);
-- verify with create table first ...
CREATE TABLE fk_notpartitioned_fk (a int, b int,
FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
ERROR: cannot reference partitioned table "fk_partitioned_pk"
-- and then with alter table.
CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
REFERENCES fk_partitioned_pk;
ERROR: cannot reference partitioned table "fk_partitioned_pk"
DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
ALTER TABLE fk_notpartitioned_pk DROP COLUMN fdrop1, DROP COLUMN fdrop2;
CREATE TABLE fk_partitioned_fk (b int, fdrop1 int, a int) PARTITION BY RANGE (a, b);
ALTER TABLE fk_partitioned_fk DROP COLUMN fdrop1;
CREATE TABLE fk_partitioned_fk_1 (fdrop1 int, fdrop2 int, a int, fdrop3 int, b int);
ALTER TABLE fk_partitioned_fk_1 DROP COLUMN fdrop1, DROP COLUMN fdrop2, DROP COLUMN fdrop3;
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_1 FOR VALUES FROM (0,0) TO (1000,1000);
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk;
CREATE TABLE fk_partitioned_fk_2 (b int, fdrop1 int, fdrop2 int, a int);
ALTER TABLE fk_partitioned_fk_2 DROP COLUMN fdrop1, DROP COLUMN fdrop2;
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES FROM (1000,1000) TO (2000,2000);
CREATE TABLE fk_partitioned_fk_3 (fdrop1 int, fdrop2 int, fdrop3 int, fdrop4 int, b int, a int)
PARTITION BY HASH (a);
ALTER TABLE fk_partitioned_fk_3 DROP COLUMN fdrop1, DROP COLUMN fdrop2,
DROP COLUMN fdrop3, DROP COLUMN fdrop4;
CREATE TABLE fk_partitioned_fk_3_0 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 0);
CREATE TABLE fk_partitioned_fk_3_1 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 1);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3
FOR VALUES FROM (2000,2000) TO (3000,3000);
-- these inserts, targetting both the partition directly as well as the
-- partitioned table, should all fail
INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(1500, 1501) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk_2 (a,b) VALUES (1500, 1501);
ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(1500, 1501) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
ERROR: insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2500, 2502) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2500, 2502);
ERROR: insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2500, 2502) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
ERROR: insert or update on table "fk_partitioned_fk_3_0" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2501, 2503) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2501, 2503);
ERROR: insert or update on table "fk_partitioned_fk_3_0" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2501, 2503) is not present in table "fk_notpartitioned_pk".
-- but if we insert the values that make them valid, then they work
INSERT INTO fk_notpartitioned_pk VALUES (500, 501), (1500, 1501),
(2500, 2502), (2501, 2503);
INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
-- this update fails because there is no referenced row
UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
ERROR: insert or update on table "fk_partitioned_fk_3_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
-- but we can fix it thusly:
INSERT INTO fk_notpartitioned_pk (a,b) VALUES (2502, 2503);
UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
-- these updates would leave lingering rows in the referencing table; disallow
UPDATE fk_notpartitioned_pk SET b = 502 WHERE a = 500;
ERROR: update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
DETAIL: Key (a, b)=(500, 501) is still referenced from table "fk_partitioned_fk".
UPDATE fk_notpartitioned_pk SET b = 1502 WHERE a = 1500;
ERROR: update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
DETAIL: Key (a, b)=(1500, 1501) is still referenced from table "fk_partitioned_fk".
UPDATE fk_notpartitioned_pk SET b = 2504 WHERE a = 2500;
ERROR: update or delete on table "fk_notpartitioned_pk" violates foreign key constraint "fk_partitioned_fk_a_fkey" on table "fk_partitioned_fk"
DETAIL: Key (a, b)=(2500, 2502) is still referenced from table "fk_partitioned_fk".
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
-- done.
DROP TABLE fk_notpartitioned_pk, fk_partitioned_fk;
-- Test some other exotic foreign key features: MATCH SIMPLE, ON UPDATE/DELETE
-- actions
CREATE TABLE fk_notpartitioned_pk (a int, b int, primary key (a, b));
CREATE TABLE fk_partitioned_fk (a int default 2501, b int default 142857) PARTITION BY LIST (a);
CREATE TABLE fk_partitioned_fk_1 PARTITION OF fk_partitioned_fk FOR VALUES IN (NULL,500,501,502);
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk MATCH SIMPLE
ON DELETE SET NULL ON UPDATE SET NULL;
CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
CREATE TABLE fk_partitioned_fk_3 (a int, b int);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3 FOR VALUES IN (2500,2501,2502,2503);
-- this insert fails
INSERT INTO fk_partitioned_fk (a, b) VALUES (2502, 2503);
ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2502, 2503) is not present in table "fk_notpartitioned_pk".
-- but since the FK is MATCH SIMPLE, this one doesn't
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, NULL);
-- now create the referenced row ...
INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
--- and now the same insert work
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
-- this always works
INSERT INTO fk_partitioned_fk (a,b) VALUES (NULL, NULL);
-- ON UPDATE SET NULL
SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
tableoid | a | b
---------------------+------+---
fk_partitioned_fk_3 | 2502 |
fk_partitioned_fk_1 | |
(2 rows)
UPDATE fk_notpartitioned_pk SET a = a + 1 WHERE a = 2502;
SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
tableoid | a | b
---------------------+------+---
fk_partitioned_fk_3 | 2502 |
fk_partitioned_fk_1 | |
fk_partitioned_fk_1 | |
(3 rows)
-- ON DELETE SET NULL
INSERT INTO fk_partitioned_fk VALUES (2503, 2503);
SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
count
-------
2
(1 row)
DELETE FROM fk_notpartitioned_pk;
SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
count
-------
3
(1 row)
-- ON UPDATE/DELETE SET DEFAULT
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk
ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
-- this fails, because the defaults for the referencing table are not present
-- in the referenced table:
UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk".
-- but inserting the row we can make it work:
INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
a | b
------+--------
2501 | 142857
(1 row)
-- ON UPDATE/DELETE CASCADE
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk
ON DELETE CASCADE ON UPDATE CASCADE;
UPDATE fk_notpartitioned_pk SET a = 2502 WHERE a = 2501;
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
a | b
------+--------
2502 | 142857
(1 row)
-- Now you see it ...
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
a | b
------+--------
2502 | 142857
(1 row)
DELETE FROM fk_notpartitioned_pk WHERE b = 142857;
-- now you don't.
SELECT * FROM fk_partitioned_fk WHERE a = 142857;
a | b
---+---
(0 rows)
-- verify that DROP works
DROP TABLE fk_partitioned_fk_2;
-- verify that attaching a table checks that the existing data satisfies the
-- constraint
CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
CREATE TABLE fk_partitioned_fk_2_1 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (0) TO (1000);
CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (1000) TO (2000);
INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
DETAIL: Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
-- leave these tables around intentionally

View File

@ -1209,6 +1209,31 @@ Inherits: test_foreign_constraints
DROP TABLE test_foreign_constraints_inh;
DROP TABLE test_foreign_constraints;
DROP TABLE test_primary_constraints;
-- Test foreign key behavior
create table inh_fk_1 (a int primary key);
insert into inh_fk_1 values (1), (2), (3);
create table inh_fk_2 (x int primary key, y int references inh_fk_1 on delete cascade);
insert into inh_fk_2 values (11, 1), (22, 2), (33, 3);
create table inh_fk_2_child () inherits (inh_fk_2);
insert into inh_fk_2_child values (111, 1), (222, 2);
delete from inh_fk_1 where a = 1;
select * from inh_fk_1 order by 1;
a
---
2
3
(2 rows)
select * from inh_fk_2 order by 1, 2;
x | y
-----+---
22 | 2
33 | 3
111 | 1
222 | 2
(4 rows)
drop table inh_fk_1, inh_fk_2, inh_fk_2_child;
-- Test that parent and child CHECK constraints can be created in either order
create table p1(f1 int);
create table p1_c1() inherits(p1);

View File

@ -2035,7 +2035,6 @@ CREATE TABLE partitioned (
a int,
b int
) PARTITION BY RANGE (a, (a+b+1));
ALTER TABLE partitioned ADD FOREIGN KEY (a) REFERENCES blah;
ALTER TABLE partitioned ADD EXCLUDE USING gist (a WITH &&);
-- cannot drop column that is part of the partition key

View File

@ -298,14 +298,6 @@ CREATE TABLE partitioned (
) PARTITION BY LIST (a1, a2); -- fail
-- unsupported constraint type for partitioned tables
CREATE TABLE pkrel (
a int PRIMARY KEY
);
CREATE TABLE partitioned (
a int REFERENCES pkrel(a)
) PARTITION BY RANGE (a);
DROP TABLE pkrel;
CREATE TABLE partitioned (
a int,
EXCLUDE USING gist (a WITH &&)

View File

@ -1065,3 +1065,157 @@ alter table fktable2 drop constraint fktable2_f1_fkey;
commit;
drop table pktable2, fktable2;
--
-- Foreign keys and partitioned tables
--
-- partitioned table in the referenced side are not allowed
CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
PARTITION BY RANGE (a, b);
-- verify with create table first ...
CREATE TABLE fk_notpartitioned_fk (a int, b int,
FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-- and then with alter table.
CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
REFERENCES fk_partitioned_pk;
DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
ALTER TABLE fk_notpartitioned_pk DROP COLUMN fdrop1, DROP COLUMN fdrop2;
CREATE TABLE fk_partitioned_fk (b int, fdrop1 int, a int) PARTITION BY RANGE (a, b);
ALTER TABLE fk_partitioned_fk DROP COLUMN fdrop1;
CREATE TABLE fk_partitioned_fk_1 (fdrop1 int, fdrop2 int, a int, fdrop3 int, b int);
ALTER TABLE fk_partitioned_fk_1 DROP COLUMN fdrop1, DROP COLUMN fdrop2, DROP COLUMN fdrop3;
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_1 FOR VALUES FROM (0,0) TO (1000,1000);
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk;
CREATE TABLE fk_partitioned_fk_2 (b int, fdrop1 int, fdrop2 int, a int);
ALTER TABLE fk_partitioned_fk_2 DROP COLUMN fdrop1, DROP COLUMN fdrop2;
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES FROM (1000,1000) TO (2000,2000);
CREATE TABLE fk_partitioned_fk_3 (fdrop1 int, fdrop2 int, fdrop3 int, fdrop4 int, b int, a int)
PARTITION BY HASH (a);
ALTER TABLE fk_partitioned_fk_3 DROP COLUMN fdrop1, DROP COLUMN fdrop2,
DROP COLUMN fdrop3, DROP COLUMN fdrop4;
CREATE TABLE fk_partitioned_fk_3_0 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 0);
CREATE TABLE fk_partitioned_fk_3_1 PARTITION OF fk_partitioned_fk_3 FOR VALUES WITH (MODULUS 5, REMAINDER 1);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3
FOR VALUES FROM (2000,2000) TO (3000,3000);
-- these inserts, targetting both the partition directly as well as the
-- partitioned table, should all fail
INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
INSERT INTO fk_partitioned_fk_2 (a,b) VALUES (1500, 1501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2500, 2502);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
INSERT INTO fk_partitioned_fk_3 (a,b) VALUES (2501, 2503);
-- but if we insert the values that make them valid, then they work
INSERT INTO fk_notpartitioned_pk VALUES (500, 501), (1500, 1501),
(2500, 2502), (2501, 2503);
INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2500, 2502);
INSERT INTO fk_partitioned_fk (a,b) VALUES (2501, 2503);
-- this update fails because there is no referenced row
UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
-- but we can fix it thusly:
INSERT INTO fk_notpartitioned_pk (a,b) VALUES (2502, 2503);
UPDATE fk_partitioned_fk SET a = a + 1 WHERE a = 2501;
-- these updates would leave lingering rows in the referencing table; disallow
UPDATE fk_notpartitioned_pk SET b = 502 WHERE a = 500;
UPDATE fk_notpartitioned_pk SET b = 1502 WHERE a = 1500;
UPDATE fk_notpartitioned_pk SET b = 2504 WHERE a = 2500;
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
-- done.
DROP TABLE fk_notpartitioned_pk, fk_partitioned_fk;
-- Test some other exotic foreign key features: MATCH SIMPLE, ON UPDATE/DELETE
-- actions
CREATE TABLE fk_notpartitioned_pk (a int, b int, primary key (a, b));
CREATE TABLE fk_partitioned_fk (a int default 2501, b int default 142857) PARTITION BY LIST (a);
CREATE TABLE fk_partitioned_fk_1 PARTITION OF fk_partitioned_fk FOR VALUES IN (NULL,500,501,502);
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk MATCH SIMPLE
ON DELETE SET NULL ON UPDATE SET NULL;
CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
CREATE TABLE fk_partitioned_fk_3 (a int, b int);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_3 FOR VALUES IN (2500,2501,2502,2503);
-- this insert fails
INSERT INTO fk_partitioned_fk (a, b) VALUES (2502, 2503);
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
-- but since the FK is MATCH SIMPLE, this one doesn't
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, NULL);
-- now create the referenced row ...
INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
--- and now the same insert work
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
-- this always works
INSERT INTO fk_partitioned_fk (a,b) VALUES (NULL, NULL);
-- ON UPDATE SET NULL
SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
UPDATE fk_notpartitioned_pk SET a = a + 1 WHERE a = 2502;
SELECT tableoid::regclass, a, b FROM fk_partitioned_fk WHERE b IS NULL ORDER BY a;
-- ON DELETE SET NULL
INSERT INTO fk_partitioned_fk VALUES (2503, 2503);
SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
DELETE FROM fk_notpartitioned_pk;
SELECT count(*) FROM fk_partitioned_fk WHERE a IS NULL;
-- ON UPDATE/DELETE SET DEFAULT
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk
ON DELETE SET DEFAULT ON UPDATE SET DEFAULT;
INSERT INTO fk_notpartitioned_pk VALUES (2502, 2503);
INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
-- this fails, because the defaults for the referencing table are not present
-- in the referenced table:
UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
-- but inserting the row we can make it work:
INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
-- ON UPDATE/DELETE CASCADE
ALTER TABLE fk_partitioned_fk DROP CONSTRAINT fk_partitioned_fk_a_fkey;
ALTER TABLE fk_partitioned_fk ADD FOREIGN KEY (a, b)
REFERENCES fk_notpartitioned_pk
ON DELETE CASCADE ON UPDATE CASCADE;
UPDATE fk_notpartitioned_pk SET a = 2502 WHERE a = 2501;
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
-- Now you see it ...
SELECT * FROM fk_partitioned_fk WHERE b = 142857;
DELETE FROM fk_notpartitioned_pk WHERE b = 142857;
-- now you don't.
SELECT * FROM fk_partitioned_fk WHERE a = 142857;
-- verify that DROP works
DROP TABLE fk_partitioned_fk_2;
-- verify that attaching a table checks that the existing data satisfies the
-- constraint
CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
CREATE TABLE fk_partitioned_fk_2_1 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (0) TO (1000);
CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES FROM (1000) TO (2000);
INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
-- leave these tables around intentionally

View File

@ -409,6 +409,18 @@ DROP TABLE test_foreign_constraints_inh;
DROP TABLE test_foreign_constraints;
DROP TABLE test_primary_constraints;
-- Test foreign key behavior
create table inh_fk_1 (a int primary key);
insert into inh_fk_1 values (1), (2), (3);
create table inh_fk_2 (x int primary key, y int references inh_fk_1 on delete cascade);
insert into inh_fk_2 values (11, 1), (22, 2), (33, 3);
create table inh_fk_2_child () inherits (inh_fk_2);
insert into inh_fk_2_child values (111, 1), (222, 2);
delete from inh_fk_1 where a = 1;
select * from inh_fk_1 order by 1;
select * from inh_fk_2 order by 1, 2;
drop table inh_fk_1, inh_fk_2, inh_fk_2_child;
-- Test that parent and child CHECK constraints can be created in either order
create table p1(f1 int);
create table p1_c1() inherits(p1);