postgresql/src/backend/catalog/pg_constraint.c

1012 lines
28 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* pg_constraint.c
* routines to support manipulation of the pg_constraint relation
*
2017-01-03 19:48:53 +01:00
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
2010-09-20 22:08:53 +02:00
* src/backend/catalog/pg_constraint.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_constraint_fn.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
/*
* CreateConstraintEntry
* Create a constraint table entry.
*
* Subsidiary records (such as triggers or indexes to implement the
* constraint) are *not* created here. But we do make dependency links
* from the constraint to the things it depends on.
*
* The new constraint's OID is returned.
*/
Oid
CreateConstraintEntry(const char *constraintName,
Oid constraintNamespace,
char constraintType,
bool isDeferrable,
bool isDeferred,
bool isValidated,
Oid relId,
const int16 *constraintKey,
int constraintNKeys,
Oid domainId,
Oid indexRelId,
Oid foreignRelId,
const int16 *foreignKey,
const Oid *pfEqOp,
const Oid *ppEqOp,
const Oid *ffEqOp,
int foreignNKeys,
char foreignUpdateType,
char foreignDeleteType,
char foreignMatchType,
const Oid *exclOp,
Node *conExpr,
const char *conBin,
const char *conSrc,
bool conIsLocal,
int conInhCount,
bool conNoInherit,
bool is_internal)
{
Relation conDesc;
Oid conOid;
HeapTuple tup;
bool nulls[Natts_pg_constraint];
Datum values[Natts_pg_constraint];
ArrayType *conkeyArray;
ArrayType *confkeyArray;
ArrayType *conpfeqopArray;
ArrayType *conppeqopArray;
ArrayType *conffeqopArray;
ArrayType *conexclopArray;
NameData cname;
int i;
ObjectAddress conobject;
conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
Assert(constraintName);
namestrcpy(&cname, constraintName);
/*
* Convert C arrays into Postgres arrays.
*/
if (constraintNKeys > 0)
{
Datum *conkey;
conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
for (i = 0; i < constraintNKeys; i++)
conkey[i] = Int16GetDatum(constraintKey[i]);
conkeyArray = construct_array(conkey, constraintNKeys,
INT2OID, 2, true, 's');
}
else
conkeyArray = NULL;
if (foreignNKeys > 0)
{
Datum *fkdatums;
fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
for (i = 0; i < foreignNKeys; i++)
fkdatums[i] = Int16GetDatum(foreignKey[i]);
confkeyArray = construct_array(fkdatums, foreignNKeys,
INT2OID, 2, true, 's');
for (i = 0; i < foreignNKeys; i++)
fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
conpfeqopArray = construct_array(fkdatums, foreignNKeys,
OIDOID, sizeof(Oid), true, 'i');
for (i = 0; i < foreignNKeys; i++)
fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
conppeqopArray = construct_array(fkdatums, foreignNKeys,
OIDOID, sizeof(Oid), true, 'i');
for (i = 0; i < foreignNKeys; i++)
fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
conffeqopArray = construct_array(fkdatums, foreignNKeys,
OIDOID, sizeof(Oid), true, 'i');
}
else
{
confkeyArray = NULL;
conpfeqopArray = NULL;
conppeqopArray = NULL;
conffeqopArray = NULL;
}
if (exclOp != NULL)
{
Datum *opdatums;
opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
for (i = 0; i < constraintNKeys; i++)
opdatums[i] = ObjectIdGetDatum(exclOp[i]);
conexclopArray = construct_array(opdatums, constraintNKeys,
OIDOID, sizeof(Oid), true, 'i');
}
else
conexclopArray = NULL;
/* initialize nulls and values */
for (i = 0; i < Natts_pg_constraint; i++)
{
nulls[i] = false;
values[i] = (Datum) NULL;
}
values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
if (conkeyArray)
values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
else
nulls[Anum_pg_constraint_conkey - 1] = true;
if (confkeyArray)
values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
else
nulls[Anum_pg_constraint_confkey - 1] = true;
if (conpfeqopArray)
values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
else
nulls[Anum_pg_constraint_conpfeqop - 1] = true;
if (conppeqopArray)
values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
else
nulls[Anum_pg_constraint_conppeqop - 1] = true;
if (conffeqopArray)
values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
else
nulls[Anum_pg_constraint_conffeqop - 1] = true;
if (conexclopArray)
values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
else
nulls[Anum_pg_constraint_conexclop - 1] = true;
/*
* initialize the binary form of the check constraint.
*/
if (conBin)
values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
else
nulls[Anum_pg_constraint_conbin - 1] = true;
/*
* initialize the text form of the check constraint
*/
if (conSrc)
values[Anum_pg_constraint_consrc - 1] = CStringGetTextDatum(conSrc);
else
nulls[Anum_pg_constraint_consrc - 1] = true;
tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
conOid = CatalogTupleInsert(conDesc, tup);
conobject.classId = ConstraintRelationId;
conobject.objectId = conOid;
conobject.objectSubId = 0;
heap_close(conDesc, RowExclusiveLock);
if (OidIsValid(relId))
{
/*
2005-10-15 04:49:52 +02:00
* Register auto dependency from constraint to owning relation, or to
* specific column(s) if any are mentioned.
*/
2002-09-04 22:31:48 +02:00
ObjectAddress relobject;
relobject.classId = RelationRelationId;
relobject.objectId = relId;
if (constraintNKeys > 0)
{
for (i = 0; i < constraintNKeys; i++)
{
relobject.objectSubId = constraintKey[i];
recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
}
}
else
{
relobject.objectSubId = 0;
recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
}
}
if (OidIsValid(domainId))
{
/*
* Register auto dependency from constraint to owning domain
*/
2003-08-04 02:43:34 +02:00
ObjectAddress domobject;
domobject.classId = TypeRelationId;
domobject.objectId = domainId;
2002-12-06 06:00:34 +01:00
domobject.objectSubId = 0;
recordDependencyOn(&conobject, &domobject, DEPENDENCY_AUTO);
}
if (OidIsValid(foreignRelId))
{
/*
2005-10-15 04:49:52 +02:00
* Register normal dependency from constraint to foreign relation, or
* to specific column(s) if any are mentioned.
*/
2002-09-04 22:31:48 +02:00
ObjectAddress relobject;
relobject.classId = RelationRelationId;
relobject.objectId = foreignRelId;
if (foreignNKeys > 0)
{
for (i = 0; i < foreignNKeys; i++)
{
relobject.objectSubId = foreignKey[i];
recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
}
}
else
{
relobject.objectSubId = 0;
recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
}
}
if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
{
/*
2003-08-04 02:43:34 +02:00
* Register normal dependency on the unique index that supports a
* foreign-key constraint. (Note: for indexes associated with unique
2010-02-26 03:01:40 +01:00
* or primary-key constraints, the dependency runs the other way, and
* is not made here.)
*/
ObjectAddress relobject;
relobject.classId = RelationRelationId;
relobject.objectId = indexRelId;
relobject.objectSubId = 0;
recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
}
if (foreignNKeys > 0)
{
/*
2007-11-15 22:14:46 +01:00
* Register normal dependencies on the equality operators that support
* a foreign-key constraint. If the PK and FK types are the same then
* all three operators for a column are the same; otherwise they are
* different.
*/
ObjectAddress oprobject;
oprobject.classId = OperatorRelationId;
oprobject.objectSubId = 0;
for (i = 0; i < foreignNKeys; i++)
{
oprobject.objectId = pfEqOp[i];
recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
if (ppEqOp[i] != pfEqOp[i])
{
oprobject.objectId = ppEqOp[i];
recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
}
if (ffEqOp[i] != pfEqOp[i])
{
oprobject.objectId = ffEqOp[i];
recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
}
}
}
/*
2010-02-26 03:01:40 +01:00
* We don't bother to register dependencies on the exclusion operators of
* an exclusion constraint. We assume they are members of the opclass
* supporting the index, so there's an indirect dependency via that. (This
* would be pretty dicey for cross-type operators, but exclusion operators
* can never be cross-type.)
*/
if (conExpr != NULL)
{
/*
2005-10-15 04:49:52 +02:00
* Register dependencies from constraint to objects mentioned in CHECK
* expression.
*/
recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
DEPENDENCY_NORMAL,
Implement table partitioning. Table partitioning is like table inheritance and reuses much of the existing infrastructure, but there are some important differences. The parent is called a partitioned table and is always empty; it may not have indexes or non-inherited constraints, since those make no sense for a relation with no data of its own. The children are called partitions and contain all of the actual data. Each partition has an implicit partitioning constraint. Multiple inheritance is not allowed, and partitioning and inheritance can't be mixed. Partitions can't have extra columns and may not allow nulls unless the parent does. Tuples inserted into the parent are automatically routed to the correct partition, so tuple-routing ON INSERT triggers are not needed. Tuple routing isn't yet supported for partitions which are foreign tables, and it doesn't handle updates that cross partition boundaries. Currently, tables can be range-partitioned or list-partitioned. List partitioning is limited to a single column, but range partitioning can involve multiple columns. A partitioning "column" can be an expression. Because table partitioning is less general than table inheritance, it is hoped that it will be easier to reason about properties of partitions, and therefore that this will serve as a better foundation for a variety of possible optimizations, including query planner optimizations. The tuple routing based which this patch does based on the implicit partitioning constraints is an example of this, but it seems likely that many other useful optimizations are also possible. Amit Langote, reviewed and tested by Robert Haas, Ashutosh Bapat, Amit Kapila, Rajkumar Raghuwanshi, Corey Huinker, Jaime Casanova, Rushabh Lathia, Erik Rijkers, among others. Minor revisions by me.
2016-12-07 19:17:43 +01:00
DEPENDENCY_NORMAL, false);
}
/* Post creation hook for new constraint */
InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
is_internal);
return conOid;
}
/*
* Test whether given name is currently used as a constraint name
* for the given object (relation or domain).
*
* This is used to decide whether to accept a user-specified constraint name.
* It is deliberately not the same test as ChooseConstraintName uses to decide
* whether an auto-generated name is OK: here, we will allow it unless there
* is an identical constraint name in use *on the same object*.
*
* NB: Caller should hold exclusive lock on the given object, else
* this test can be fooled by concurrent additions.
*/
bool
ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
Oid objNamespace, const char *conname)
{
bool found;
Relation conDesc;
SysScanDesc conscan;
ScanKeyData skey[2];
HeapTuple tup;
conDesc = heap_open(ConstraintRelationId, AccessShareLock);
found = false;
ScanKeyInit(&skey[0],
Anum_pg_constraint_conname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(conname));
ScanKeyInit(&skey[1],
Anum_pg_constraint_connamespace,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(objNamespace));
conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
NULL, 2, skey);
while (HeapTupleIsValid(tup = systable_getnext(conscan)))
{
2002-09-04 22:31:48 +02:00
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
if (conCat == CONSTRAINT_RELATION && con->conrelid == objId)
{
found = true;
break;
}
else if (conCat == CONSTRAINT_DOMAIN && con->contypid == objId)
{
found = true;
break;
}
}
systable_endscan(conscan);
heap_close(conDesc, AccessShareLock);
return found;
}
/*
* Select a nonconflicting name for a new constraint.
*
* The objective here is to choose a name that is unique within the
* specified namespace. Postgres does not require this, but the SQL
* spec does, and some apps depend on it. Therefore we avoid choosing
* default names that so conflict.
*
* name1, name2, and label are used the same way as for makeObjectName(),
* except that the label can't be NULL; digits will be appended to the label
* if needed to create a name that is unique within the specified namespace.
*
* 'others' can be a list of string names already chosen within the current
* command (but not yet reflected into the catalogs); we will not choose
* a duplicate of one of these either.
*
* Note: it is theoretically possible to get a collision anyway, if someone
* else chooses the same name concurrently. This is fairly unlikely to be
* a problem in practice, especially if one is holding an exclusive lock on
* the relation identified by name1.
*
* Returns a palloc'd string.
*/
char *
ChooseConstraintName(const char *name1, const char *name2,
const char *label, Oid namespaceid,
List *others)
{
int pass = 0;
char *conname = NULL;
char modlabel[NAMEDATALEN];
Relation conDesc;
SysScanDesc conscan;
ScanKeyData skey[2];
bool found;
ListCell *l;
conDesc = heap_open(ConstraintRelationId, AccessShareLock);
/* try the unmodified label first */
StrNCpy(modlabel, label, sizeof(modlabel));
for (;;)
{
conname = makeObjectName(name1, name2, modlabel);
found = false;
foreach(l, others)
{
if (strcmp((char *) lfirst(l), conname) == 0)
{
found = true;
break;
}
}
if (!found)
{
ScanKeyInit(&skey[0],
Anum_pg_constraint_conname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(conname));
ScanKeyInit(&skey[1],
Anum_pg_constraint_connamespace,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(namespaceid));
conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
NULL, 2, skey);
found = (HeapTupleIsValid(systable_getnext(conscan)));
systable_endscan(conscan);
}
if (!found)
break;
/* found a conflict, so try a new name component */
pfree(conname);
snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
}
heap_close(conDesc, AccessShareLock);
return conname;
}
/*
* Delete a single constraint record.
*/
void
RemoveConstraintById(Oid conId)
{
2002-09-04 22:31:48 +02:00
Relation conDesc;
HeapTuple tup;
Form_pg_constraint con;
conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
if (!HeapTupleIsValid(tup)) /* should not happen */
elog(ERROR, "cache lookup failed for constraint %u", conId);
con = (Form_pg_constraint) GETSTRUCT(tup);
/*
* Special processing depending on what the constraint is for.
*/
if (OidIsValid(con->conrelid))
{
Relation rel;
/*
2005-10-15 04:49:52 +02:00
* If the constraint is for a relation, open and exclusive-lock the
* relation it's for.
*/
rel = heap_open(con->conrelid, AccessExclusiveLock);
/*
2005-10-15 04:49:52 +02:00
* We need to update the relcheck count if it is a check constraint
* being dropped. This update will force backends to rebuild relcache
* entries when we commit.
*/
if (con->contype == CONSTRAINT_CHECK)
{
2002-09-04 22:31:48 +02:00
Relation pgrel;
HeapTuple relTup;
Form_pg_class classForm;
pgrel = heap_open(RelationRelationId, RowExclusiveLock);
relTup = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(con->conrelid));
if (!HeapTupleIsValid(relTup))
elog(ERROR, "cache lookup failed for relation %u",
con->conrelid);
classForm = (Form_pg_class) GETSTRUCT(relTup);
2003-08-04 02:43:34 +02:00
if (classForm->relchecks == 0) /* should not happen */
elog(ERROR, "relation \"%s\" has relchecks = 0",
RelationGetRelationName(rel));
classForm->relchecks--;
CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
heap_freetuple(relTup);
heap_close(pgrel, RowExclusiveLock);
}
/* Keep lock on constraint's rel until end of xact */
heap_close(rel, NoLock);
}
else if (OidIsValid(con->contypid))
{
/*
2005-10-15 04:49:52 +02:00
* XXX for now, do nothing special when dropping a domain constraint
*
* Probably there should be some form of locking on the domain type,
* but we have no such concept at the moment.
*/
}
2002-12-06 06:00:34 +01:00
else
elog(ERROR, "constraint %u is not of a known type", conId);
/* Fry the constraint itself */
CatalogTupleDelete(conDesc, &tup->t_self);
/* Clean up */
ReleaseSysCache(tup);
heap_close(conDesc, RowExclusiveLock);
}
/*
* RenameConstraintById
* Rename a constraint.
*
* Note: this isn't intended to be a user-exposed function; it doesn't check
* permissions etc. Currently this is only invoked when renaming an index
* that is associated with a constraint, but it's made a little more general
* than that with the expectation of someday having ALTER TABLE RENAME
* CONSTRAINT.
*/
void
RenameConstraintById(Oid conId, const char *newname)
{
Relation conDesc;
HeapTuple tuple;
Form_pg_constraint con;
conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for constraint %u", conId);
con = (Form_pg_constraint) GETSTRUCT(tuple);
/*
* We need to check whether the name is already in use --- note that there
* currently is not a unique index that would catch this.
*/
if (OidIsValid(con->conrelid) &&
ConstraintNameIsUsed(CONSTRAINT_RELATION,
con->conrelid,
con->connamespace,
newname))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("constraint \"%s\" for relation \"%s\" already exists",
newname, get_rel_name(con->conrelid))));
if (OidIsValid(con->contypid) &&
ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
con->contypid,
con->connamespace,
newname))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("constraint \"%s\" for domain %s already exists",
newname, format_type_be(con->contypid))));
/* OK, do the rename --- tuple is a copy, so OK to scribble on it */
namestrcpy(&(con->conname), newname);
CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
heap_freetuple(tuple);
heap_close(conDesc, RowExclusiveLock);
}
/*
* AlterConstraintNamespaces
* Find any constraints belonging to the specified object,
* and move them to the specified new namespace.
*
* isType indicates whether the owning object is a type or a relation.
*/
void
AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
Oid newNspId, bool isType, ObjectAddresses *objsMoved)
{
2005-10-15 04:49:52 +02:00
Relation conRel;
ScanKeyData key[1];
SysScanDesc scan;
HeapTuple tup;
conRel = heap_open(ConstraintRelationId, RowExclusiveLock);
if (isType)
{
ScanKeyInit(&key[0],
Anum_pg_constraint_contypid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(ownerId));
scan = systable_beginscan(conRel, ConstraintTypidIndexId, true,
NULL, 1, key);
}
else
{
ScanKeyInit(&key[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(ownerId));
scan = systable_beginscan(conRel, ConstraintRelidIndexId, true,
NULL, 1, key);
}
while (HeapTupleIsValid((tup = systable_getnext(scan))))
{
Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
ObjectAddress thisobj;
thisobj.classId = ConstraintRelationId;
thisobj.objectId = HeapTupleGetOid(tup);
thisobj.objectSubId = 0;
if (object_address_present(&thisobj, objsMoved))
continue;
/* Don't update if the object is already part of the namespace */
if (conform->connamespace == oldNspId && oldNspId != newNspId)
{
tup = heap_copytuple(tup);
conform = (Form_pg_constraint) GETSTRUCT(tup);
conform->connamespace = newNspId;
CatalogTupleUpdate(conRel, &tup->t_self, tup);
/*
* Note: currently, the constraint will not have its own
* dependency on the namespace, so we don't need to do
* changeDependencyFor().
*/
}
InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
add_exact_object_address(&thisobj, objsMoved);
}
systable_endscan(scan);
heap_close(conRel, RowExclusiveLock);
}
/*
* get_relation_constraint_oid
* Find a constraint on the specified relation with the specified name.
* Returns constraint's OID.
*/
Oid
get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
{
Relation pg_constraint;
HeapTuple tuple;
SysScanDesc scan;
ScanKeyData skey[1];
Oid conOid = InvalidOid;
/*
* Fetch the constraint tuple from pg_constraint. There may be more than
* one match, because constraints are not required to have unique names;
* if so, error out.
*/
pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
if (strcmp(NameStr(con->conname), conname) == 0)
{
if (OidIsValid(conOid))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
2010-02-26 03:01:40 +01:00
errmsg("table \"%s\" has multiple constraints named \"%s\"",
get_rel_name(relid), conname)));
conOid = HeapTupleGetOid(tuple);
}
}
systable_endscan(scan);
/* If no such constraint exists, complain */
if (!OidIsValid(conOid) && !missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("constraint \"%s\" for table \"%s\" does not exist",
conname, get_rel_name(relid))));
heap_close(pg_constraint, AccessShareLock);
return conOid;
}
/*
* get_domain_constraint_oid
* Find a constraint on the specified domain with the specified name.
* Returns constraint's OID.
*/
Oid
get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
{
Relation pg_constraint;
HeapTuple tuple;
SysScanDesc scan;
ScanKeyData skey[1];
Oid conOid = InvalidOid;
/*
* Fetch the constraint tuple from pg_constraint. There may be more than
* one match, because constraints are not required to have unique names;
* if so, error out.
*/
pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_constraint_contypid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(typid));
scan = systable_beginscan(pg_constraint, ConstraintTypidIndexId, true,
NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
if (strcmp(NameStr(con->conname), conname) == 0)
{
if (OidIsValid(conOid))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("domain \"%s\" has multiple constraints named \"%s\"",
format_type_be(typid), conname)));
conOid = HeapTupleGetOid(tuple);
}
}
systable_endscan(scan);
/* If no such constraint exists, complain */
if (!OidIsValid(conOid) && !missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("constraint \"%s\" for domain \"%s\" does not exist",
conname, format_type_be(typid))));
heap_close(pg_constraint, AccessShareLock);
return conOid;
}
/*
* get_primary_key_attnos
* Identify the columns in a relation's primary key, if any.
*
* Returns a Bitmapset of the column attnos of the primary key's columns,
* with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
* system columns can be represented.
*
* If there is no primary key, return NULL. We also return NULL if the pkey
* constraint is deferrable and deferrableOk is false.
*
* *constraintOid is set to the OID of the pkey constraint, or InvalidOid
* on failure.
*/
Bitmapset *
get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
{
Bitmapset *pkattnos = NULL;
Relation pg_constraint;
HeapTuple tuple;
SysScanDesc scan;
ScanKeyData skey[1];
/* Set *constraintOid, to avoid complaints about uninitialized vars */
*constraintOid = InvalidOid;
/* Scan pg_constraint for constraints of the target rel */
pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
Datum adatum;
bool isNull;
ArrayType *arr;
int16 *attnums;
int numkeys;
int i;
/* Skip constraints that are not PRIMARY KEYs */
if (con->contype != CONSTRAINT_PRIMARY)
continue;
/*
* If the primary key is deferrable, but we've been instructed to
* ignore deferrable constraints, then we might as well give up
* searching, since there can only be a single primary key on a table.
*/
if (con->condeferrable && !deferrableOk)
break;
/* Extract the conkey array, ie, attnums of PK's columns */
adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
RelationGetDescr(pg_constraint), &isNull);
if (isNull)
elog(ERROR, "null conkey for constraint %u",
HeapTupleGetOid(tuple));
arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
numkeys = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
numkeys < 0 ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != INT2OID)
elog(ERROR, "conkey is not a 1-D smallint array");
attnums = (int16 *) ARR_DATA_PTR(arr);
/* Construct the result value */
for (i = 0; i < numkeys; i++)
{
pkattnos = bms_add_member(pkattnos,
attnums[i] - FirstLowInvalidHeapAttributeNumber);
}
*constraintOid = HeapTupleGetOid(tuple);
/* No need to search further */
break;
}
systable_endscan(scan);
heap_close(pg_constraint, AccessShareLock);
return pkattnos;
}
/*
* Determine whether a relation can be proven functionally dependent on
* a set of grouping columns. If so, return TRUE and add the pg_constraint
* OIDs of the constraints needed for the proof to the *constraintDeps list.
*
* grouping_columns is a list of grouping expressions, in which columns of
* the rel of interest are Vars with the indicated varno/varlevelsup.
*
* Currently we only check to see if the rel has a primary key that is a
* subset of the grouping_columns. We could also use plain unique constraints
* if all their columns are known not null, but there's a problem: we need
* to be able to represent the not-null-ness as part of the constraints added
* to *constraintDeps. FIXME whenever not-null constraints get represented
* in pg_constraint.
*/
bool
check_functional_grouping(Oid relid,
Index varno, Index varlevelsup,
List *grouping_columns,
List **constraintDeps)
{
Bitmapset *pkattnos;
Bitmapset *groupbyattnos;
Oid constraintOid;
ListCell *gl;
/* If the rel has no PK, then we can't prove functional dependency */
pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
if (pkattnos == NULL)
return false;
/* Identify all the rel's columns that appear in grouping_columns */
groupbyattnos = NULL;
foreach(gl, grouping_columns)
{
Var *gvar = (Var *) lfirst(gl);
if (IsA(gvar, Var) &&
gvar->varno == varno &&
gvar->varlevelsup == varlevelsup)
groupbyattnos = bms_add_member(groupbyattnos,
gvar->varattno - FirstLowInvalidHeapAttributeNumber);
}
if (bms_is_subset(pkattnos, groupbyattnos))
{
/* The PK is a subset of grouping_columns, so we win */
*constraintDeps = lappend_oid(*constraintDeps, constraintOid);
return true;
}
return false;
}