postgresql/src/backend/catalog/heap.c

2501 lines
70 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* heap.c
* code to create and destroy POSTGRES heap relations
*
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.336 2008/07/30 19:35:13 tgl Exp $
*
*
* INTERFACE ROUTINES
* heap_create() - Create an uncataloged heap relation
* heap_create_with_catalog() - Create a cataloged relation
* heap_drop_with_catalog() - Removes named relation from catalogs
*
* NOTES
* this code taken from access/heap/create.c, which contains
* the old heap_create_with_catalog, amcreate, and amdestroy.
* those routines will soon call these routines using the function
* manager,
* just like the poorly named "NewXXX" routines do. The
* "New" routines are all going to die soon, once and for all!
* -cim 1/13/91
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "catalog/pg_type_fn.h"
#include "commands/tablecmds.h"
#include "commands/typecmds.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/var.h"
#include "parser/parse_coerce.h"
1999-07-16 07:00:38 +02:00
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
static void AddNewRelationTuple(Relation pg_class_desc,
Relation new_rel_desc,
Oid new_rel_oid, Oid new_type_oid,
Oid relowner,
char relkind,
Datum reloptions);
2005-10-15 04:49:52 +02:00
static Oid AddNewRelationType(const char *typeName,
2002-09-04 22:31:48 +02:00
Oid typeNamespace,
Oid new_rel_oid,
char new_rel_kind,
Oid new_array_type);
static void RelationRemoveInheritance(Oid relid);
static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
bool is_local, int inhcount);
static void StoreConstraints(Relation rel, List *cooked_constraints);
static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
bool allow_merge, bool is_local);
static void SetRelationNumChecks(Relation rel, int numchecks);
static Node *cookConstraint(ParseState *pstate,
Node *raw_constraint,
char *relname);
static List *insert_ordered_unique_oid(List *list, Oid datum);
/* ----------------------------------------------------------------
* XXX UGLY HARD CODED BADNESS FOLLOWS XXX
*
* these should all be moved to someplace in the lib/catalog
* module, if not obliterated first.
* ----------------------------------------------------------------
*/
/*
* Note:
* Should the system special case these attributes in the future?
* Advantage: consume much less space in the ATTRIBUTE relation.
* Disadvantage: special cases will be all over the place.
*/
static FormData_pg_attribute a1 = {
0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
SelfItemPointerAttributeNumber, 0, -1, -1,
false, 'p', 's', true, false, false, true, 0
};
static FormData_pg_attribute a2 = {
0, {"oid"}, OIDOID, 0, sizeof(Oid),
ObjectIdAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
static FormData_pg_attribute a3 = {
0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
MinTransactionIdAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
static FormData_pg_attribute a4 = {
0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
MinCommandIdAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
static FormData_pg_attribute a5 = {
0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
MaxTransactionIdAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
static FormData_pg_attribute a6 = {
0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
MaxCommandIdAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
/*
* We decided to call this attribute "tableoid" rather than say
* "classoid" on the basis that in the future there may be more than one
* table of a particular class/type. In any case table is still the word
* used in SQL.
*/
static FormData_pg_attribute a7 = {
0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
TableOidAttributeNumber, 0, -1, -1,
true, 'p', 'i', true, false, false, true, 0
};
static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
/*
* This function returns a Form_pg_attribute pointer for a system attribute.
* Note that we elog if the presented attno is invalid, which would only
* happen if there's a problem upstream.
*/
Form_pg_attribute
SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
{
if (attno >= 0 || attno < -(int) lengthof(SysAtt))
elog(ERROR, "invalid system attribute number %d", attno);
if (attno == ObjectIdAttributeNumber && !relhasoids)
elog(ERROR, "invalid system attribute number %d", attno);
return SysAtt[-attno - 1];
}
/*
* If the given name is a system attribute name, return a Form_pg_attribute
* pointer for a prototype definition. If not, return NULL.
*/
Form_pg_attribute
SystemAttributeByName(const char *attname, bool relhasoids)
{
int j;
for (j = 0; j < (int) lengthof(SysAtt); j++)
{
Form_pg_attribute att = SysAtt[j];
if (relhasoids || att->attnum != ObjectIdAttributeNumber)
{
if (strcmp(NameStr(att->attname), attname) == 0)
return att;
}
}
return NULL;
}
/* ----------------------------------------------------------------
* XXX END OF UGLY HARD CODED BADNESS XXX
* ---------------------------------------------------------------- */
/* ----------------------------------------------------------------
* heap_create - Create an uncataloged heap relation
*
* Note API change: the caller must now always provide the OID
* to use for the relation.
*
* rel->rd_rel is initialized by RelationBuildLocalRelation,
* and is mostly zeroes at return.
* ----------------------------------------------------------------
*/
Relation
heap_create(const char *relname,
Oid relnamespace,
Oid reltablespace,
Oid relid,
TupleDesc tupDesc,
char relkind,
bool shared_relation,
bool allow_system_table_mods)
{
bool create_storage;
Relation rel;
/* The caller must have provided an OID for the relation. */
Assert(OidIsValid(relid));
/*
* sanity checks
*/
if (!allow_system_table_mods &&
2005-10-15 04:49:52 +02:00
(IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace)) &&
IsNormalProcessingMode())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to create \"%s.%s\"",
get_namespace_name(relnamespace), relname),
2005-10-15 04:49:52 +02:00
errdetail("System catalog modifications are currently disallowed.")));
/*
2005-10-15 04:49:52 +02:00
* Decide if we need storage or not, and handle a couple other special
* cases for particular relkinds.
*/
switch (relkind)
{
case RELKIND_VIEW:
case RELKIND_COMPOSITE_TYPE:
create_storage = false;
2005-10-15 04:49:52 +02:00
/*
* Force reltablespace to zero if the relation has no physical
* storage. This is mainly just for cleanliness' sake.
*/
reltablespace = InvalidOid;
break;
case RELKIND_SEQUENCE:
create_storage = true;
2005-10-15 04:49:52 +02:00
/*
* Force reltablespace to zero for sequences, since we don't
* support moving them around into different tablespaces.
*/
reltablespace = InvalidOid;
break;
default:
create_storage = true;
break;
}
/*
* Never allow a pg_class entry to explicitly specify the database's
2004-08-29 07:07:03 +02:00
* default tablespace in reltablespace; force it to zero instead. This
* ensures that if the database is cloned with a different default
2005-10-15 04:49:52 +02:00
* tablespace, the pg_class entry will still match where CREATE DATABASE
* will put the physically copied relation.
*
* Yes, this is a bit of a hack.
*/
if (reltablespace == MyDatabaseTableSpace)
reltablespace = InvalidOid;
/*
* build the relcache entry.
*/
rel = RelationBuildLocalRelation(relname,
relnamespace,
tupDesc,
relid,
reltablespace,
shared_relation);
/*
2005-10-15 04:49:52 +02:00
* have the storage manager create the relation's disk file, if needed.
*/
if (create_storage)
{
Assert(rel->rd_smgr == NULL);
RelationOpenSmgr(rel);
smgrcreate(rel->rd_smgr, rel->rd_istemp, false);
}
1998-09-01 05:29:17 +02:00
return rel;
}
/* ----------------------------------------------------------------
* heap_create_with_catalog - Create a cataloged relation
*
* this is done in multiple steps:
*
* 1) CheckAttributeNamesTypes() is used to make certain the tuple
* descriptor contains a valid set of attribute names and types
*
* 2) pg_class is opened and get_relname_relid()
* performs a scan to ensure that no relation with the
* same name already exists.
*
* 3) heap_create() is called to create the new relation on disk.
*
* 4) TypeCreate() is called to define a new type corresponding
* to the new relation.
*
* 5) AddNewRelationTuple() is called to register the
* relation in pg_class.
*
* 6) AddNewAttributeTuples() is called to register the
* new relation's schema in pg_attribute.
*
* 7) StoreConstraints is called () - vadim 08/22/97
*
* 8) the relations are closed and the new relation's oid
* is returned.
*
* ----------------------------------------------------------------
*/
/* --------------------------------
* CheckAttributeNamesTypes
*
* this is used to make certain the tuple descriptor contains a
* valid set of attribute names and datatypes. a problem simply
* generates ereport(ERROR) which aborts the current transaction.
* --------------------------------
*/
void
CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind)
{
int i;
int j;
int natts = tupdesc->natts;
/* Sanity check on column count */
if (natts < 0 || natts > MaxHeapAttributeNumber)
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_COLUMNS),
errmsg("tables can have at most %d columns",
MaxHeapAttributeNumber)));
/*
* first check for collision with system attribute names
*
* Skip this for a view or type relation, since those don't have system
* attributes.
*/
if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
2002-05-22 17:35:43 +02:00
{
for (i = 0; i < natts; i++)
{
if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
tupdesc->tdhasoid) != NULL)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column name \"%s\" conflicts with a system column name",
NameStr(tupdesc->attrs[i]->attname))));
2002-05-22 17:35:43 +02:00
}
}
/*
* next check for repeated attribute names
*/
for (i = 1; i < natts; i++)
{
for (j = 0; j < i; j++)
{
if (strcmp(NameStr(tupdesc->attrs[j]->attname),
NameStr(tupdesc->attrs[i]->attname)) == 0)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column name \"%s\" specified more than once",
NameStr(tupdesc->attrs[j]->attname))));
}
}
/*
* next check the attribute types
*/
for (i = 0; i < natts; i++)
{
CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
tupdesc->attrs[i]->atttypid);
}
}
/* --------------------------------
* CheckAttributeType
*
* Verify that the proposed datatype of an attribute is legal.
* This is needed because there are types (and pseudo-types)
* in the catalogs that we do not support as elements of real tuples.
* --------------------------------
*/
void
CheckAttributeType(const char *attname, Oid atttypid)
{
char att_typtype = get_typtype(atttypid);
if (atttypid == UNKNOWNOID)
{
/*
* Warn user, but don't fail, if column to be created has UNKNOWN type
2007-11-15 22:14:46 +01:00
* (usually as a result of a 'retrieve into' - jolly)
*/
ereport(WARNING,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column \"%s\" has type \"unknown\"", attname),
errdetail("Proceeding with relation creation anyway.")));
}
else if (att_typtype == TYPTYPE_PSEUDO)
{
/*
2007-11-15 22:14:46 +01:00
* Refuse any attempt to create a pseudo-type column, except for a
* special hack for pg_statistic: allow ANYARRAY during initdb
*/
if (atttypid != ANYARRAYOID || IsUnderPostmaster)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column \"%s\" has pseudo-type %s",
attname, format_type_be(atttypid))));
}
else if (att_typtype == TYPTYPE_COMPOSITE)
{
/*
2007-11-15 22:14:46 +01:00
* For a composite type, recurse into its attributes. You might think
* this isn't necessary, but since we allow system catalogs to break
* the rule, we have to guard against the case.
*/
2007-11-15 22:14:46 +01:00
Relation relation;
TupleDesc tupdesc;
int i;
relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
tupdesc = RelationGetDescr(relation);
for (i = 0; i < tupdesc->natts; i++)
{
Form_pg_attribute attr = tupdesc->attrs[i];
if (attr->attisdropped)
continue;
CheckAttributeType(NameStr(attr->attname), attr->atttypid);
}
relation_close(relation, AccessShareLock);
}
}
/* --------------------------------
* AddNewAttributeTuples
*
* this registers the new relation's schema by adding
* tuples to pg_attribute.
* --------------------------------
*/
static void
AddNewAttributeTuples(Oid new_rel_oid,
TupleDesc tupdesc,
char relkind,
bool oidislocal,
int oidinhcount)
{
const Form_pg_attribute *dpp;
int i;
HeapTuple tup;
Relation rel;
CatalogIndexState indstate;
int natts = tupdesc->natts;
2002-09-04 22:31:48 +02:00
ObjectAddress myself,
referenced;
/*
* open pg_attribute and its indexes.
*/
rel = heap_open(AttributeRelationId, RowExclusiveLock);
indstate = CatalogOpenIndexes(rel);
/*
2005-10-15 04:49:52 +02:00
* First we add the user attributes. This is also a convenient place to
* add dependencies on their datatypes.
*/
dpp = tupdesc->attrs;
for (i = 0; i < natts; i++)
{
/* Fill in the correct relation OID */
(*dpp)->attrelid = new_rel_oid;
/* Make sure these are OK, too */
(*dpp)->attstattarget = -1;
(*dpp)->attcacheoff = -1;
tup = heap_addheader(Natts_pg_attribute,
2002-09-04 22:31:48 +02:00
false,
ATTRIBUTE_TUPLE_SIZE,
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
(void *) *dpp);
simple_heap_insert(rel, tup);
1999-05-25 18:15:34 +02:00
CatalogIndexInsert(indstate, tup);
heap_freetuple(tup);
myself.classId = RelationRelationId;
myself.objectId = new_rel_oid;
2002-09-04 22:31:48 +02:00
myself.objectSubId = i + 1;
referenced.classId = TypeRelationId;
referenced.objectId = (*dpp)->atttypid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
dpp++;
}
/*
2005-10-15 04:49:52 +02:00
* Next we add the system attributes. Skip OID if rel has no OIDs. Skip
* all for a view or type relation. We don't bother with making datatype
* dependencies here, since presumably all these types are pinned.
*/
if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
2002-05-22 17:35:43 +02:00
{
dpp = SysAtt;
for (i = 0; i < -1 - FirstLowInvalidHeapAttributeNumber; i++)
{
if (tupdesc->tdhasoid ||
(*dpp)->attnum != ObjectIdAttributeNumber)
{
Form_pg_attribute attStruct;
tup = heap_addheader(Natts_pg_attribute,
2002-09-04 22:31:48 +02:00
false,
2002-05-22 17:35:43 +02:00
ATTRIBUTE_TUPLE_SIZE,
(void *) *dpp);
attStruct = (Form_pg_attribute) GETSTRUCT(tup);
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
/* Fill in the correct relation OID in the copied tuple */
attStruct->attrelid = new_rel_oid;
/* Fill in correct inheritance info for the OID column */
if (attStruct->attnum == ObjectIdAttributeNumber)
{
attStruct->attislocal = oidislocal;
attStruct->attinhcount = oidinhcount;
}
/*
2002-09-04 22:31:48 +02:00
* Unneeded since they should be OK in the constant data
* anyway
*/
/* attStruct->attstattarget = 0; */
/* attStruct->attcacheoff = -1; */
simple_heap_insert(rel, tup);
CatalogIndexInsert(indstate, tup);
heap_freetuple(tup);
}
2002-05-22 17:35:43 +02:00
dpp++;
}
2002-05-22 17:35:43 +02:00
}
/*
* clean up
*/
CatalogCloseIndexes(indstate);
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
heap_close(rel, RowExclusiveLock);
}
/* --------------------------------
* InsertPgClassTuple
*
* Construct and insert a new tuple in pg_class.
*
* Caller has already opened and locked pg_class.
* Tuple data is taken from new_rel_desc->rd_rel, except for the
* variable-width fields which are not present in a cached reldesc.
2006-07-08 22:45:38 +02:00
* We always initialize relacl to NULL (i.e., default permissions),
* and reloptions is set to the passed-in text array (if any).
* --------------------------------
*/
void
InsertPgClassTuple(Relation pg_class_desc,
Relation new_rel_desc,
Oid new_rel_oid,
Datum reloptions)
{
Form_pg_class rd_rel = new_rel_desc->rd_rel;
Datum values[Natts_pg_class];
char nulls[Natts_pg_class];
HeapTuple tup;
/* This is a tad tedious, but way cleaner than what we used to do... */
memset(values, 0, sizeof(values));
memset(nulls, ' ', sizeof(nulls));
values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
values[Anum_pg_class_reltriggers - 1] = Int16GetDatum(rd_rel->reltriggers);
values[Anum_pg_class_relukeys - 1] = Int16GetDatum(rd_rel->relukeys);
values[Anum_pg_class_relfkeys - 1] = Int16GetDatum(rd_rel->relfkeys);
values[Anum_pg_class_relrefs - 1] = Int16GetDatum(rd_rel->relrefs);
values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
/* start out with empty permissions */
nulls[Anum_pg_class_relacl - 1] = 'n';
if (reloptions != (Datum) 0)
values[Anum_pg_class_reloptions - 1] = reloptions;
else
nulls[Anum_pg_class_reloptions - 1] = 'n';
tup = heap_formtuple(RelationGetDescr(pg_class_desc), values, nulls);
/*
2006-10-04 02:30:14 +02:00
* The new tuple must have the oid already chosen for the rel. Sure would
* be embarrassing to do this sort of thing in polite company.
*/
HeapTupleSetOid(tup, new_rel_oid);
/* finally insert the new tuple, update the indexes, and clean up */
simple_heap_insert(pg_class_desc, tup);
CatalogUpdateIndexes(pg_class_desc, tup);
heap_freetuple(tup);
}
/* --------------------------------
* AddNewRelationTuple
*
* this registers the new relation in the catalogs by
* adding a tuple to pg_class.
* --------------------------------
*/
static void
AddNewRelationTuple(Relation pg_class_desc,
1999-05-25 18:15:34 +02:00
Relation new_rel_desc,
Oid new_rel_oid,
Oid new_type_oid,
Oid relowner,
char relkind,
Datum reloptions)
{
Form_pg_class new_rel_reltup;
/*
2005-10-15 04:49:52 +02:00
* first we update some of the information in our uncataloged relation's
* relation descriptor.
*/
new_rel_reltup = new_rel_desc->rd_rel;
switch (relkind)
{
case RELKIND_RELATION:
case RELKIND_INDEX:
case RELKIND_TOASTVALUE:
/* The relation is real, but as yet empty */
new_rel_reltup->relpages = 0;
new_rel_reltup->reltuples = 0;
break;
case RELKIND_SEQUENCE:
/* Sequences always have a known size */
new_rel_reltup->relpages = 1;
new_rel_reltup->reltuples = 1;
break;
default:
/* Views, etc, have no disk storage */
new_rel_reltup->relpages = 0;
new_rel_reltup->reltuples = 0;
break;
}
/* Initialize relfrozenxid */
if (relkind == RELKIND_RELATION ||
relkind == RELKIND_TOASTVALUE)
{
/*
* Initialize to the minimum XID that could put tuples in the table.
2007-11-15 22:14:46 +01:00
* We know that no xacts older than RecentXmin are still running, so
* that will do.
*/
new_rel_reltup->relfrozenxid = RecentXmin;
}
else
{
/*
2007-11-15 22:14:46 +01:00
* Other relation types will not contain XIDs, so set relfrozenxid to
* InvalidTransactionId. (Note: a sequence does contain a tuple, but
* we force its xmin to be FrozenTransactionId always; see
* commands/sequence.c.)
*/
new_rel_reltup->relfrozenxid = InvalidTransactionId;
}
new_rel_reltup->relowner = relowner;
new_rel_reltup->reltype = new_type_oid;
new_rel_reltup->relkind = relkind;
new_rel_desc->rd_att->tdtypeid = new_type_oid;
/* Now build and insert the tuple */
InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid, reloptions);
}
/* --------------------------------
* AddNewRelationType -
*
* define a composite type corresponding to the new relation
* --------------------------------
*/
static Oid
AddNewRelationType(const char *typeName,
Oid typeNamespace,
Oid new_rel_oid,
char new_rel_kind,
2007-11-15 22:14:46 +01:00
Oid new_array_type)
{
return
TypeCreate(InvalidOid, /* no predetermined OID */
typeName, /* type name */
2005-10-15 04:49:52 +02:00
typeNamespace, /* type namespace */
new_rel_oid, /* relation oid */
new_rel_kind, /* relation kind */
2005-10-15 04:49:52 +02:00
-1, /* internal size (varlena) */
TYPTYPE_COMPOSITE, /* type-type (composite) */
TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
false, /* composite types are never preferred */
DEFAULT_TYPDELIM, /* default array delimiter */
2005-10-15 04:49:52 +02:00
F_RECORD_IN, /* input procedure */
F_RECORD_OUT, /* output procedure */
2005-10-15 04:49:52 +02:00
F_RECORD_RECV, /* receive procedure */
F_RECORD_SEND, /* send procedure */
InvalidOid, /* typmodin procedure - none */
InvalidOid, /* typmodout procedure - none */
2005-10-15 04:49:52 +02:00
InvalidOid, /* analyze procedure - default */
InvalidOid, /* array element type - irrelevant */
false, /* this is not an array type */
2007-11-15 22:14:46 +01:00
new_array_type, /* array type if any */
2005-10-15 04:49:52 +02:00
InvalidOid, /* domain base type - irrelevant */
NULL, /* default value - none */
NULL, /* default binary representation */
false, /* passed by reference */
'd', /* alignment - must be the largest! */
'x', /* fully TOASTable */
-1, /* typmod */
0, /* array dimensions for typBaseType */
false); /* Type NOT NULL */
}
/* --------------------------------
* heap_create_with_catalog
*
* creates a new cataloged relation. see comments above.
* --------------------------------
*/
Oid
heap_create_with_catalog(const char *relname,
Oid relnamespace,
Oid reltablespace,
Oid relid,
Oid ownerid,
TupleDesc tupdesc,
List *cooked_constraints,
char relkind,
bool shared_relation,
bool oidislocal,
int oidinhcount,
OnCommitAction oncommit,
Datum reloptions,
bool allow_system_table_mods)
{
Relation pg_class_desc;
Relation new_rel_desc;
Oid old_type_oid;
Oid new_type_oid;
2007-11-15 22:14:46 +01:00
Oid new_array_oid = InvalidOid;
1999-05-25 18:15:34 +02:00
pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
/*
* sanity checks
*/
Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
CheckAttributeNamesTypes(tupdesc, relkind);
if (get_relname_relid(relname, relnamespace))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists", relname)));
2002-09-04 22:31:48 +02:00
/*
* Since we are going to create a rowtype as well, also check for
2007-11-15 22:14:46 +01:00
* collision with an existing type name. If there is one and it's an
* autogenerated array, we can rename it out of the way; otherwise we can
* at least give a good error message.
*/
old_type_oid = GetSysCacheOid(TYPENAMENSP,
CStringGetDatum(relname),
ObjectIdGetDatum(relnamespace),
0, 0);
if (OidIsValid(old_type_oid))
{
if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("type \"%s\" already exists", relname),
2007-11-15 22:14:46 +01:00
errhint("A relation has an associated type of the same name, "
"so you must use a name that doesn't conflict "
"with any existing type.")));
}
/*
* Validate shared/non-shared tablespace (must check this before doing
* GetNewRelFileNode, to prevent Assert therein)
*/
if (shared_relation)
{
if (reltablespace != GLOBALTABLESPACE_OID)
/* elog since this is not a user-facing error */
elog(ERROR,
"shared relations must be placed in pg_global tablespace");
}
else
{
if (reltablespace == GLOBALTABLESPACE_OID)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("only shared relations can be placed in pg_global tablespace")));
}
/*
* Allocate an OID for the relation, unless we were told what to use.
*
* The OID will be the relfilenode as well, so make sure it doesn't
* collide with either pg_class OIDs or existing physical files.
*/
if (!OidIsValid(relid))
relid = GetNewRelFileNode(reltablespace, shared_relation,
pg_class_desc);
/*
2005-10-15 04:49:52 +02:00
* Create the relcache entry (mostly dummy at this point) and the physical
* disk file. (If we fail further down, it's the smgr's responsibility to
* remove the disk file again.)
*/
new_rel_desc = heap_create(relname,
relnamespace,
reltablespace,
relid,
tupdesc,
relkind,
shared_relation,
allow_system_table_mods);
Assert(relid == RelationGetRelid(new_rel_desc));
/*
2007-11-15 22:14:46 +01:00
* Decide whether to create an array type over the relation's rowtype. We
* do not create any array types for system catalogs (ie, those made
* during initdb). We create array types for regular relations, views,
* and composite types ... but not, eg, for toast tables or sequences.
*/
if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
relkind == RELKIND_VIEW ||
relkind == RELKIND_COMPOSITE_TYPE))
{
/* OK, so pre-assign a type OID for the array type */
2007-11-15 22:14:46 +01:00
Relation pg_type = heap_open(TypeRelationId, AccessShareLock);
new_array_oid = GetNewOid(pg_type);
heap_close(pg_type, AccessShareLock);
}
/*
* Since defining a relation also defines a complex type, we add a new
* system type corresponding to the new relation.
*
* NOTE: we could get a unique-index failure here, in case someone else is
2007-11-15 22:14:46 +01:00
* creating the same type name in parallel but hadn't committed yet when
* we checked for a duplicate name above.
*/
new_type_oid = AddNewRelationType(relname,
relnamespace,
relid,
relkind,
2007-11-15 22:14:46 +01:00
new_array_oid);
/*
* Now make the array type if wanted.
*/
if (OidIsValid(new_array_oid))
{
char *relarrayname;
relarrayname = makeArrayTypeName(relname, relnamespace);
TypeCreate(new_array_oid, /* force the type's OID to this */
2007-11-15 22:14:46 +01:00
relarrayname, /* Array type name */
relnamespace, /* Same namespace as parent */
InvalidOid, /* Not composite, no relationOid */
0, /* relkind, also N/A here */
-1, /* Internal size (varlena) */
TYPTYPE_BASE, /* Not composite - typelem is */
TYPCATEGORY_ARRAY, /* type-category (array) */
false, /* array types are never preferred */
DEFAULT_TYPDELIM, /* default array delimiter */
2007-11-15 22:14:46 +01:00
F_ARRAY_IN, /* array input proc */
F_ARRAY_OUT, /* array output proc */
F_ARRAY_RECV, /* array recv (bin) proc */
F_ARRAY_SEND, /* array send (bin) proc */
InvalidOid, /* typmodin procedure - none */
InvalidOid, /* typmodout procedure - none */
InvalidOid, /* analyze procedure - default */
new_type_oid, /* array element type - the rowtype */
true, /* yes, this is an array type */
InvalidOid, /* this has no array type */
InvalidOid, /* domain base type - irrelevant */
NULL, /* default value - none */
NULL, /* default binary representation */
false, /* passed by reference */
'd', /* alignment - must be the largest! */
'x', /* fully TOASTable */
-1, /* typmod */
0, /* array dimensions for typBaseType */
false); /* Type NOT NULL */
pfree(relarrayname);
}
/*
* now create an entry in pg_class for the relation.
*
2005-10-15 04:49:52 +02:00
* NOTE: we could get a unique-index failure here, in case someone else is
* creating the same relation name in parallel but hadn't committed yet
* when we checked for a duplicate name above.
*/
AddNewRelationTuple(pg_class_desc,
1999-05-25 18:15:34 +02:00
new_rel_desc,
relid,
new_type_oid,
ownerid,
relkind,
reloptions);
/*
2005-10-15 04:49:52 +02:00
* now add tuples to pg_attribute for the attributes in our new relation.
*/
AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
oidislocal, oidinhcount);
/*
* Make a dependency link to force the relation to be deleted if its
* namespace is. Also make a dependency link to its owner.
*
* For composite types, these dependencies are tracked for the pg_type
* entry, so we needn't record them here. Likewise, TOAST tables don't
* need a namespace dependency (they live in a pinned namespace) nor an
* owner dependency (they depend indirectly through the parent table).
* Also, skip this in bootstrap mode, since we don't make dependencies
* while bootstrapping.
*/
if (relkind != RELKIND_COMPOSITE_TYPE &&
relkind != RELKIND_TOASTVALUE &&
!IsBootstrapProcessingMode())
{
2002-09-04 22:31:48 +02:00
ObjectAddress myself,
referenced;
myself.classId = RelationRelationId;
myself.objectId = relid;
myself.objectSubId = 0;
referenced.classId = NamespaceRelationId;
referenced.objectId = relnamespace;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
recordDependencyOnOwner(RelationRelationId, relid, ownerid);
}
/*
* Store any supplied constraints and defaults.
*
* NB: this may do a CommandCounterIncrement and rebuild the relcache
* entry, so the relation must be valid and self-consistent at this point.
* In particular, there are not yet constraints and defaults anywhere.
*/
StoreConstraints(new_rel_desc, cooked_constraints);
/*
* If there's a special on-commit action, remember it
*/
if (oncommit != ONCOMMIT_NOOP)
register_on_commit_action(relid, oncommit);
/*
2005-10-15 04:49:52 +02:00
* ok, the relation has been cataloged, so close our relations and return
* the OID of the newly created relation.
*/
heap_close(new_rel_desc, NoLock); /* do not unlock till end of xact */
heap_close(pg_class_desc, RowExclusiveLock);
return relid;
}
/*
* RelationRemoveInheritance
*
* Formerly, this routine checked for child relations and aborted the
2002-09-04 22:31:48 +02:00
* deletion if any were found. Now we rely on the dependency mechanism
* to check for or delete child relations. By the time we get here,
* there are no children and we need only remove any pg_inherits rows
* linking this relation to its parent(s).
*/
static void
RelationRemoveInheritance(Oid relid)
{
Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key;
HeapTuple tuple;
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
SnapshotNow, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
simple_heap_delete(catalogRelation, &tuple->t_self);
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
}
/*
* DeleteRelationTuple
*
* Remove pg_class row for the given relid.
*
* Note: this is shared by relation deletion and index deletion. It's
* not intended for use anyplace else.
*/
void
DeleteRelationTuple(Oid relid)
{
Relation pg_class_desc;
HeapTuple tup;
/* Grab an appropriate lock on the pg_class relation */
pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
tup = SearchSysCache(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0);
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for relation %u", relid);
/* delete the relation tuple from pg_class, and finish up */
simple_heap_delete(pg_class_desc, &tup->t_self);
ReleaseSysCache(tup);
heap_close(pg_class_desc, RowExclusiveLock);
}
/*
* DeleteAttributeTuples
*
* Remove pg_attribute rows for the given relid.
*
* Note: this is shared by relation deletion and index deletion. It's
* not intended for use anyplace else.
*/
void
DeleteAttributeTuples(Oid relid)
{
Relation attrel;
2002-09-04 22:31:48 +02:00
SysScanDesc scan;
ScanKeyData key[1];
HeapTuple atttup;
/* Grab an appropriate lock on the pg_attribute relation */
attrel = heap_open(AttributeRelationId, RowExclusiveLock);
/* Use the index to scan only attributes of the target relation */
ScanKeyInit(&key[0],
Anum_pg_attribute_attrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
SnapshotNow, 1, key);
/* Delete all the matching tuples */
while ((atttup = systable_getnext(scan)) != NULL)
simple_heap_delete(attrel, &atttup->t_self);
/* Clean up after the scan */
systable_endscan(scan);
heap_close(attrel, RowExclusiveLock);
}
/*
* RemoveAttributeById
*
* This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
* deleted in pg_attribute. We also remove pg_statistic entries for it.
* (Everything else needed, such as getting rid of any pg_attrdef entry,
* is handled by dependency.c.)
*/
void
RemoveAttributeById(Oid relid, AttrNumber attnum)
{
Relation rel;
Relation attr_rel;
HeapTuple tuple;
Form_pg_attribute attStruct;
char newattname[NAMEDATALEN];
/*
2005-10-15 04:49:52 +02:00
* Grab an exclusive lock on the target table, which we will NOT release
* until end of transaction. (In the simple case where we are directly
* dropping this column, AlterTableDropColumn already did this ... but
* when cascading from a drop of some other object, we may not have any
* lock.)
*/
rel = relation_open(relid, AccessExclusiveLock);
attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy(ATTNUM,
ObjectIdGetDatum(relid),
Int16GetDatum(attnum),
0, 0);
2002-09-04 22:31:48 +02:00
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, relid);
attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
if (attnum < 0)
{
/* System attribute (probably OID) ... just delete the row */
simple_heap_delete(attr_rel, &tuple->t_self);
}
else
{
/* Dropping user attributes is lots harder */
/* Mark the attribute as dropped */
attStruct->attisdropped = true;
/*
* Set the type OID to invalid. A dropped attribute's type link
2005-10-15 04:49:52 +02:00
* cannot be relied on (once the attribute is dropped, the type might
* be too). Fortunately we do not need the type row --- the only
* really essential information is the type's typlen and typalign,
* which are preserved in the attribute's attlen and attalign. We set
* atttypid to zero here as a means of catching code that incorrectly
* expects it to be valid.
*/
attStruct->atttypid = InvalidOid;
/* Remove any NOT NULL constraint the column may have */
attStruct->attnotnull = false;
/* We don't want to keep stats for it anymore */
attStruct->attstattarget = 0;
2004-08-29 07:07:03 +02:00
/*
2005-10-15 04:49:52 +02:00
* Change the column name to something that isn't likely to conflict
2004-08-29 07:07:03 +02:00
*/
snprintf(newattname, sizeof(newattname),
"........pg.dropped.%d........", attnum);
namestrcpy(&(attStruct->attname), newattname);
simple_heap_update(attr_rel, &tuple->t_self, tuple);
/* keep the system catalog indexes current */
CatalogUpdateIndexes(attr_rel, tuple);
}
/*
2005-10-15 04:49:52 +02:00
* Because updating the pg_attribute row will trigger a relcache flush for
* the target relation, we need not do anything else to notify other
* backends of the change.
*/
heap_close(attr_rel, RowExclusiveLock);
if (attnum > 0)
RemoveStatistics(relid, attnum);
relation_close(rel, NoLock);
}
/*
* RemoveAttrDefault
*
* If the specified relation/attribute has a default, remove it.
* (If no default, raise error if complain is true, else return quietly.)
*/
void
RemoveAttrDefault(Oid relid, AttrNumber attnum,
DropBehavior behavior, bool complain)
{
Relation attrdef_rel;
ScanKeyData scankeys[2];
SysScanDesc scan;
HeapTuple tuple;
bool found = false;
attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
ScanKeyInit(&scankeys[0],
Anum_pg_attrdef_adrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
ScanKeyInit(&scankeys[1],
Anum_pg_attrdef_adnum,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(attnum));
scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
SnapshotNow, 2, scankeys);
/* There should be at most one matching tuple, but we loop anyway */
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
2002-09-04 22:31:48 +02:00
ObjectAddress object;
object.classId = AttrDefaultRelationId;
object.objectId = HeapTupleGetOid(tuple);
object.objectSubId = 0;
performDeletion(&object, behavior);
found = true;
}
systable_endscan(scan);
heap_close(attrdef_rel, RowExclusiveLock);
if (complain && !found)
elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
relid, attnum);
}
/*
* RemoveAttrDefaultById
*
2002-09-04 22:31:48 +02:00
* Remove a pg_attrdef entry specified by OID. This is the guts of
* attribute-default removal. Note it should be called via performDeletion,
* not directly.
*/
void
RemoveAttrDefaultById(Oid attrdefId)
{
Relation attrdef_rel;
Relation attr_rel;
Relation myrel;
ScanKeyData scankeys[1];
SysScanDesc scan;
HeapTuple tuple;
Oid myrelid;
AttrNumber myattnum;
/* Grab an appropriate lock on the pg_attrdef relation */
attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
/* Find the pg_attrdef tuple */
ScanKeyInit(&scankeys[0],
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(attrdefId));
scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
SnapshotNow, 1, scankeys);
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
/* Get an exclusive lock on the relation owning the attribute */
myrel = relation_open(myrelid, AccessExclusiveLock);
/* Now we can delete the pg_attrdef row */
simple_heap_delete(attrdef_rel, &tuple->t_self);
systable_endscan(scan);
heap_close(attrdef_rel, RowExclusiveLock);
/* Fix the pg_attribute row */
attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy(ATTNUM,
ObjectIdGetDatum(myrelid),
Int16GetDatum(myattnum),
0, 0);
2002-09-04 22:31:48 +02:00
if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
myattnum, myrelid);
((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
simple_heap_update(attr_rel, &tuple->t_self, tuple);
/* keep the system catalog indexes current */
CatalogUpdateIndexes(attr_rel, tuple);
/*
2005-10-15 04:49:52 +02:00
* Our update of the pg_attribute row will force a relcache rebuild, so
* there's nothing else to do here.
*/
heap_close(attr_rel, RowExclusiveLock);
/* Keep lock on attribute's rel until end of xact */
relation_close(myrel, NoLock);
}
/*
* heap_drop_with_catalog - removes specified relation from catalogs
*
* Note that this routine is not responsible for dropping objects that are
* linked to the pg_class entry via dependencies (for example, indexes and
* constraints). Those are deleted by the dependency-tracing logic in
* dependency.c before control gets here. In general, therefore, this routine
* should never be called directly; go through performDeletion() instead.
1996-11-06 08:31:26 +01:00
*/
void
heap_drop_with_catalog(Oid relid)
1996-11-06 08:31:26 +01:00
{
Relation rel;
/*
* Open and lock the relation.
*/
rel = relation_open(relid, AccessExclusiveLock);
/*
* Schedule unlinking of the relation's physical file at commit.
*/
if (rel->rd_rel->relkind != RELKIND_VIEW &&
rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
{
RelationOpenSmgr(rel);
smgrscheduleunlink(rel->rd_smgr, rel->rd_istemp);
}
/*
2005-10-15 04:49:52 +02:00
* Close relcache entry, but *keep* AccessExclusiveLock on the relation
* until transaction commit. This ensures no one else will try to do
* something with the doomed relation.
*/
relation_close(rel, NoLock);
/*
* Forget any ON COMMIT action for the rel
*/
remove_on_commit_action(relid);
/*
* Flush the relation from the relcache. We want to do this before
2005-10-15 04:49:52 +02:00
* starting to remove catalog entries, just to be certain that no relcache
* entry rebuild will happen partway through. (That should not really
* matter, since we don't do CommandCounterIncrement here, but let's be
* safe.)
*/
RelationForgetRelation(relid);
/*
* remove inheritance information
*/
RelationRemoveInheritance(relid);
/*
* delete statistics
*/
RemoveStatistics(relid, 0);
/*
* delete attribute tuples
*/
DeleteAttributeTuples(relid);
/*
* delete relation tuple
*/
DeleteRelationTuple(relid);
1996-11-06 08:31:26 +01:00
}
/*
* Store a default expression for column attnum of relation rel.
*/
void
StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
1997-08-22 04:58:51 +02:00
{
char *adbin;
char *adsrc;
Relation adrel;
HeapTuple tuple;
Datum values[4];
static char nulls[4] = {' ', ' ', ' ', ' '};
Relation attrrel;
HeapTuple atttup;
Form_pg_attribute attStruct;
Oid attrdefOid;
2002-09-04 22:31:48 +02:00
ObjectAddress colobject,
defobject;
1999-05-25 18:15:34 +02:00
/*
* Flatten expression to string form for storage.
*/
adbin = nodeToString(expr);
/*
* Also deparse it to form the mostly-obsolete adsrc field.
*/
adsrc = deparse_expression(expr,
2005-10-15 04:49:52 +02:00
deparse_context_for(RelationGetRelationName(rel),
RelationGetRelid(rel)),
false, false);
/*
* Make the pg_attrdef entry.
*/
values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
values[Anum_pg_attrdef_adnum - 1] = attnum;
values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
tuple = heap_formtuple(adrel->rd_att, values, nulls);
attrdefOid = simple_heap_insert(adrel, tuple);
CatalogUpdateIndexes(adrel, tuple);
defobject.classId = AttrDefaultRelationId;
defobject.objectId = attrdefOid;
defobject.objectSubId = 0;
heap_close(adrel, RowExclusiveLock);
/* now can free some of the stuff allocated above */
pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
heap_freetuple(tuple);
pfree(adbin);
pfree(adsrc);
/*
* Update the pg_attribute entry for the column to show that a default
* exists.
*/
attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
atttup = SearchSysCacheCopy(ATTNUM,
ObjectIdGetDatum(RelationGetRelid(rel)),
Int16GetDatum(attnum),
0, 0);
if (!HeapTupleIsValid(atttup))
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, RelationGetRelid(rel));
attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
if (!attStruct->atthasdef)
{
attStruct->atthasdef = true;
simple_heap_update(attrrel, &atttup->t_self, atttup);
/* keep catalog indexes current */
CatalogUpdateIndexes(attrrel, atttup);
}
heap_close(attrrel, RowExclusiveLock);
heap_freetuple(atttup);
/*
2005-10-15 04:49:52 +02:00
* Make a dependency so that the pg_attrdef entry goes away if the column
* (or whole table) is deleted.
*/
colobject.classId = RelationRelationId;
colobject.objectId = RelationGetRelid(rel);
colobject.objectSubId = attnum;
recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
/*
* Record dependencies on objects used in the expression, too.
*/
recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
1997-08-22 04:58:51 +02:00
}
/*
* Store a check-constraint expression for the given relation.
*
* Caller is responsible for updating the count of constraints
* in the pg_class entry for the relation.
*/
static void
StoreRelCheck(Relation rel, char *ccname, Node *expr,
bool is_local, int inhcount)
1997-08-22 04:58:51 +02:00
{
char *ccbin;
char *ccsrc;
List *varList;
int keycount;
int16 *attNos;
1999-05-25 18:15:34 +02:00
/*
* Flatten expression to string form for storage.
*/
ccbin = nodeToString(expr);
/*
* Also deparse it to form the mostly-obsolete consrc field.
*/
ccsrc = deparse_expression(expr,
2005-10-15 04:49:52 +02:00
deparse_context_for(RelationGetRelationName(rel),
RelationGetRelid(rel)),
false, false);
/*
* Find columns of rel that are used in expr
*
* NB: pull_var_clause is okay here only because we don't allow subselects
* in check constraints; it would fail to examine the contents of
* subselects.
*/
varList = pull_var_clause(expr, false);
keycount = list_length(varList);
if (keycount > 0)
{
ListCell *vl;
int i = 0;
attNos = (int16 *) palloc(keycount * sizeof(int16));
foreach(vl, varList)
{
2002-09-04 22:31:48 +02:00
Var *var = (Var *) lfirst(vl);
int j;
for (j = 0; j < i; j++)
if (attNos[j] == var->varattno)
break;
if (j == i)
attNos[i++] = var->varattno;
}
keycount = i;
}
else
attNos = NULL;
/*
* Create the Check Constraint
*/
2002-09-04 22:31:48 +02:00
CreateConstraintEntry(ccname, /* Constraint Name */
RelationGetNamespace(rel), /* namespace */
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
2002-09-04 22:31:48 +02:00
RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */
keycount, /* # attrs in the constraint */
InvalidOid, /* not a domain constraint */
InvalidOid, /* Foreign key fields */
NULL,
NULL,
NULL,
NULL,
0,
' ',
' ',
' ',
2003-08-04 02:43:34 +02:00
InvalidOid, /* no associated index */
2002-09-04 22:31:48 +02:00
expr, /* Tree form check constraint */
ccbin, /* Binary form check constraint */
ccsrc, /* Source form check constraint */
is_local, /* conislocal */
inhcount); /* coninhcount */
pfree(ccbin);
pfree(ccsrc);
}
/*
* Store defaults and constraints (passed as a list of CookedConstraint).
*
* NOTE: only pre-cooked expressions will be passed this way, which is to
* say expressions inherited from an existing relation. Newly parsed
* expressions can be added later, by direct calls to StoreAttrDefault
* and StoreRelCheck (see AddRelationNewConstraints()).
*/
static void
StoreConstraints(Relation rel, List *cooked_constraints)
{
int numchecks = 0;
ListCell *lc;
if (!cooked_constraints)
return; /* nothing to do */
/*
2005-10-15 04:49:52 +02:00
* Deparsing of constraint expressions will fail unless the just-created
* pg_attribute tuples for this relation are made visible. So, bump the
* command counter. CAUTION: this will cause a relcache entry rebuild.
*/
CommandCounterIncrement();
foreach(lc, cooked_constraints)
{
CookedConstraint *con = (CookedConstraint *) lfirst(lc);
switch (con->contype)
{
case CONSTR_DEFAULT:
StoreAttrDefault(rel, con->attnum, con->expr);
break;
case CONSTR_CHECK:
StoreRelCheck(rel, con->name, con->expr,
con->is_local, con->inhcount);
numchecks++;
break;
default:
elog(ERROR, "unrecognized constraint type: %d",
(int) con->contype);
}
}
if (numchecks > 0)
SetRelationNumChecks(rel, numchecks);
}
/*
* AddRelationNewConstraints
*
* Add new column default expressions and/or constraint check expressions
* to an existing relation. This is defined to do both for efficiency in
* DefineRelation, but of course you can do just one or the other by passing
* empty lists.
*
* rel: relation to be modified
* newColDefaults: list of RawColumnDefault structures
* newConstraints: list of Constraint nodes
* allow_merge: TRUE if check constraints may be merged with existing ones
* is_local: TRUE if definition is local, FALSE if it's inherited
*
* All entries in newColDefaults will be processed. Entries in newConstraints
* will be processed only if they are CONSTR_CHECK type.
*
* Returns a list of CookedConstraint nodes that shows the cooked form of
* the default and constraint expressions added to the relation.
*
* NB: caller should have opened rel with AccessExclusiveLock, and should
* hold that lock till end of transaction. Also, we assume the caller has
* done a CommandCounterIncrement if necessary to make the relation's catalog
* tuples visible.
*/
List *
AddRelationNewConstraints(Relation rel,
List *newColDefaults,
List *newConstraints,
bool allow_merge,
bool is_local)
{
List *cookedConstraints = NIL;
TupleDesc tupleDesc;
TupleConstr *oldconstr;
int numoldchecks;
ParseState *pstate;
RangeTblEntry *rte;
int numchecks;
List *checknames;
ListCell *cell;
2002-03-20 20:45:13 +01:00
Node *expr;
CookedConstraint *cooked;
/*
* Get info about existing constraints.
*/
tupleDesc = RelationGetDescr(rel);
oldconstr = tupleDesc->constr;
if (oldconstr)
numoldchecks = oldconstr->num_check;
else
numoldchecks = 0;
/*
2005-10-15 04:49:52 +02:00
* Create a dummy ParseState and insert the target relation as its sole
* rangetable entry. We need a ParseState for transformExpr.
*/
pstate = make_parsestate(NULL);
rte = addRangeTableEntryForRelation(pstate,
rel,
NULL,
false,
true);
addRTEtoQuery(pstate, rte, true, true, true);
/*
* Process column default expressions.
*/
foreach(cell, newColDefaults)
{
RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
2002-03-20 20:45:13 +01:00
expr = cookDefault(pstate, colDef->raw_default,
atp->atttypid, atp->atttypmod,
NameStr(atp->attname));
/*
2007-11-15 22:14:46 +01:00
* If the expression is just a NULL constant, we do not bother to make
* an explicit pg_attrdef entry, since the default behavior is
* equivalent.
*
* Note a nonobvious property of this test: if the column is of a
* domain type, what we'll get is not a bare null Const but a
* CoerceToDomain expr, so we will not discard the default. This is
* critical because the column default needs to be retained to
* override any default that the domain might have.
*/
if (expr == NULL ||
2007-11-15 22:14:46 +01:00
(IsA(expr, Const) &&((Const *) expr)->constisnull))
continue;
StoreAttrDefault(rel, colDef->attnum, expr);
cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
cooked->contype = CONSTR_DEFAULT;
cooked->name = NULL;
cooked->attnum = colDef->attnum;
cooked->expr = expr;
cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
/*
* Process constraint expressions.
*/
numchecks = numoldchecks;
checknames = NIL;
foreach(cell, newConstraints)
{
Constraint *cdef = (Constraint *) lfirst(cell);
char *ccname;
if (cdef->contype != CONSTR_CHECK)
continue;
if (cdef->raw_expr != NULL)
{
Assert(cdef->cooked_expr == NULL);
/*
* Transform raw parsetree to executable expression, and verify
* it's valid as a CHECK constraint.
*/
expr = cookConstraint(pstate, cdef->raw_expr,
RelationGetRelationName(rel));
}
else
{
Assert(cdef->cooked_expr != NULL);
/*
* Here, we assume the parser will only pass us valid CHECK
* expressions, so we do no particular checking.
*/
expr = stringToNode(cdef->cooked_expr);
}
/*
* Check name uniqueness, or generate a name if none was given.
*/
if (cdef->name != NULL)
{
ListCell *cell2;
ccname = cdef->name;
/* Check against other new constraints */
/* Needed because we don't do CommandCounterIncrement in loop */
foreach(cell2, checknames)
{
if (strcmp((char *) lfirst(cell2), ccname) == 0)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
2005-10-15 04:49:52 +02:00
errmsg("check constraint \"%s\" already exists",
ccname)));
}
/* save name for future checks */
checknames = lappend(checknames, ccname);
/*
* Check against pre-existing constraints. If we are allowed
* to merge with an existing constraint, there's no more to
* do here. (We omit the duplicate constraint from the result,
* which is what ATAddCheckConstraint wants.)
*/
if (MergeWithExistingConstraint(rel, ccname, expr,
allow_merge, is_local))
continue;
}
else
{
/*
2005-10-15 04:49:52 +02:00
* When generating a name, we want to create "tab_col_check" for a
* column constraint and "tab_check" for a table constraint. We
* no longer have any info about the syntactic positioning of the
* constraint phrase, so we approximate this by seeing whether the
* expression references more than one column. (If the user
* played by the rules, the result is the same...)
*
2004-08-29 07:07:03 +02:00
* Note: pull_var_clause() doesn't descend into sublinks, but we
* eliminated those above; and anyway this only needs to be an
* approximate answer.
*/
2004-08-29 07:07:03 +02:00
List *vars;
char *colname;
vars = pull_var_clause(expr, false);
/* eliminate duplicates */
vars = list_union(NIL, vars);
if (list_length(vars) == 1)
colname = get_attname(RelationGetRelid(rel),
((Var *) linitial(vars))->varattno);
else
colname = NULL;
ccname = ChooseConstraintName(RelationGetRelationName(rel),
colname,
"check",
RelationGetNamespace(rel),
checknames);
/* save name for future checks */
checknames = lappend(checknames, ccname);
}
/*
* OK, store it.
*/
StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
numchecks++;
cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
cooked->contype = CONSTR_CHECK;
cooked->name = ccname;
cooked->attnum = 0;
cooked->expr = expr;
cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked);
}
/*
2005-10-15 04:49:52 +02:00
* Update the count of constraints in the relation's pg_class tuple. We do
* this even if there was no change, in order to ensure that an SI update
* message is sent out for the pg_class tuple, which will force other
* backends to rebuild their relcache entries for the rel. (This is
* critical if we added defaults but not constraints.)
*/
SetRelationNumChecks(rel, numchecks);
return cookedConstraints;
}
/*
* Check for a pre-existing check constraint that conflicts with a proposed
* new one, and either adjust its conislocal/coninhcount settings or throw
* error as needed.
*
* Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
* got a so-far-unique name, or throws error if conflict.
*/
static bool
MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
bool allow_merge, bool is_local)
{
bool found;
Relation conDesc;
SysScanDesc conscan;
ScanKeyData skey[2];
HeapTuple tup;
/* Search for a pg_constraint entry with same name and relation */
conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
found = false;
ScanKeyInit(&skey[0],
Anum_pg_constraint_conname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(ccname));
ScanKeyInit(&skey[1],
Anum_pg_constraint_connamespace,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetNamespace(rel)));
conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
SnapshotNow, 2, skey);
while (HeapTupleIsValid(tup = systable_getnext(conscan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
if (con->conrelid == RelationGetRelid(rel))
{
/* Found it. Conflicts if not identical check constraint */
if (con->contype == CONSTRAINT_CHECK)
{
Datum val;
bool isnull;
val = fastgetattr(tup,
Anum_pg_constraint_conbin,
conDesc->rd_att, &isnull);
if (isnull)
elog(ERROR, "null conbin for rel %s",
RelationGetRelationName(rel));
if (equal(expr, stringToNode(TextDatumGetCString(val))))
found = true;
}
if (!found || !allow_merge)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("constraint \"%s\" for relation \"%s\" already exists",
ccname, RelationGetRelationName(rel))));
/* OK to update the tuple */
ereport(NOTICE,
(errmsg("merging constraint \"%s\" with inherited definition",
ccname)));
tup = heap_copytuple(tup);
con = (Form_pg_constraint) GETSTRUCT(tup);
if (is_local)
con->conislocal = true;
else
con->coninhcount++;
simple_heap_update(conDesc, &tup->t_self, tup);
CatalogUpdateIndexes(conDesc, tup);
break;
}
}
systable_endscan(conscan);
heap_close(conDesc, RowExclusiveLock);
return found;
}
/*
* Update the count of constraints in the relation's pg_class tuple.
*
* Caller had better hold exclusive lock on the relation.
*
* An important side effect is that a SI update message will be sent out for
* the pg_class tuple, which will force other backends to rebuild their
* relcache entries for the rel. Also, this backend will rebuild its
* own relcache entry at the next CommandCounterIncrement.
*/
static void
SetRelationNumChecks(Relation rel, int numchecks)
{
Relation relrel;
HeapTuple reltup;
Form_pg_class relStruct;
relrel = heap_open(RelationRelationId, RowExclusiveLock);
reltup = SearchSysCacheCopy(RELOID,
ObjectIdGetDatum(RelationGetRelid(rel)),
0, 0, 0);
if (!HeapTupleIsValid(reltup))
elog(ERROR, "cache lookup failed for relation %u",
RelationGetRelid(rel));
relStruct = (Form_pg_class) GETSTRUCT(reltup);
if (relStruct->relchecks != numchecks)
{
relStruct->relchecks = numchecks;
simple_heap_update(relrel, &reltup->t_self, reltup);
/* keep catalog indexes current */
CatalogUpdateIndexes(relrel, reltup);
}
else
{
/* Skip the disk update, but force relcache inval anyway */
CacheInvalidateRelcache(rel);
}
heap_freetuple(reltup);
heap_close(relrel, RowExclusiveLock);
}
/*
* Take a raw default and convert it to a cooked format ready for
* storage.
*
2002-03-20 20:45:13 +01:00
* Parse state should be set up to recognize any vars that might appear
* in the expression. (Even though we plan to reject vars, it's more
* user-friendly to give the correct error message than "unknown var".)
*
* If atttypid is not InvalidOid, coerce the expression to the specified
* type (and typmod atttypmod). attname is only needed in this case:
* it is used in the error message, if any.
*/
Node *
cookDefault(ParseState *pstate,
Node *raw_default,
Oid atttypid,
int32 atttypmod,
2002-03-20 20:45:13 +01:00
char *attname)
{
2002-09-04 22:31:48 +02:00
Node *expr;
Assert(raw_default != NULL);
/*
* Transform raw parsetree to executable expression.
*/
expr = transformExpr(pstate, raw_default);
/*
* Make sure default expr does not refer to any vars.
*/
if (contain_var_clause(expr))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2005-10-15 04:49:52 +02:00
errmsg("cannot use column references in default expression")));
/*
* It can't return a set either.
*/
if (expression_returns_set(expr))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("default expression must not return a set")));
/*
* No subplans or aggregates, either...
*/
if (pstate->p_hasSubLinks)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use subquery in default expression")));
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
2005-10-15 04:49:52 +02:00
errmsg("cannot use aggregate function in default expression")));
/*
2005-10-15 04:49:52 +02:00
* Coerce the expression to the correct type and typmod, if given. This
* should match the parser's processing of non-defaulted expressions ---
* see transformAssignedExpr().
*/
2002-03-20 20:45:13 +01:00
if (OidIsValid(atttypid))
{
2002-09-04 22:31:48 +02:00
Oid type_id = exprType(expr);
expr = coerce_to_target_type(pstate, expr, type_id,
atttypid, atttypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST);
if (expr == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" is of type %s"
" but default expression is of type %s",
attname,
format_type_be(atttypid),
format_type_be(type_id)),
2005-10-15 04:49:52 +02:00
errhint("You will need to rewrite or cast the expression.")));
}
return expr;
}
/*
* Take a raw CHECK constraint expression and convert it to a cooked format
* ready for storage.
*
* Parse state must be set up to recognize any vars that might appear
* in the expression.
*/
static Node *
cookConstraint(ParseState *pstate,
Node *raw_constraint,
char *relname)
{
Node *expr;
/*
* Transform raw parsetree to executable expression.
*/
expr = transformExpr(pstate, raw_constraint);
/*
* Make sure it yields a boolean result.
*/
expr = coerce_to_boolean(pstate, expr, "CHECK");
/*
* Make sure no outside relations are referred to.
*/
if (list_length(pstate->p_rtable) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("only table \"%s\" can be referenced in check constraint",
relname)));
/*
* No subplans or aggregates, either...
*/
if (pstate->p_hasSubLinks)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use subquery in check constraint")));
if (pstate->p_hasAggs)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("cannot use aggregate function in check constraint")));
return expr;
}
1997-08-22 04:58:51 +02:00
/*
* RemoveStatistics --- remove entries in pg_statistic for a rel or column
*
* If attnum is zero, remove all entries for rel; else remove only the one
* for that column.
*/
void
RemoveStatistics(Oid relid, AttrNumber attnum)
{
Relation pgstatistic;
SysScanDesc scan;
ScanKeyData key[2];
int nkeys;
HeapTuple tuple;
pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
ScanKeyInit(&key[0],
Anum_pg_statistic_starelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
if (attnum == 0)
nkeys = 1;
else
{
ScanKeyInit(&key[1],
Anum_pg_statistic_staattnum,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(attnum));
nkeys = 2;
}
scan = systable_beginscan(pgstatistic, StatisticRelidAttnumIndexId, true,
SnapshotNow, nkeys, key);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
simple_heap_delete(pgstatistic, &tuple->t_self);
systable_endscan(scan);
heap_close(pgstatistic, RowExclusiveLock);
}
/*
* RelationTruncateIndexes - truncate all indexes associated
* with the heap relation to zero tuples.
*
* The routine will truncate and then reconstruct the indexes on
2006-10-04 02:30:14 +02:00
* the specified relation. Caller must hold exclusive lock on rel.
*/
static void
RelationTruncateIndexes(Relation heapRelation)
{
ListCell *indlist;
/* Ask the relcache to produce a list of the indexes of the rel */
foreach(indlist, RelationGetIndexList(heapRelation))
{
Oid indexId = lfirst_oid(indlist);
Relation currentIndex;
IndexInfo *indexInfo;
/* Open the index relation; use exclusive lock, just to be sure */
currentIndex = index_open(indexId, AccessExclusiveLock);
/* Fetch info needed for index_build */
indexInfo = BuildIndexInfo(currentIndex);
/* Now truncate the actual file (and discard buffers) */
RelationTruncate(currentIndex, 0);
/* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */
index_build(heapRelation, currentIndex, indexInfo, false);
/* We're done with this index */
index_close(currentIndex, NoLock);
}
}
/*
* heap_truncate
*
* This routine deletes all data within all the specified relations.
*
* This is not transaction-safe! There is another, transaction-safe
2005-10-15 04:49:52 +02:00
* implementation in commands/tablecmds.c. We now use this only for
* ON COMMIT truncation of temporary tables, where it doesn't matter.
*/
void
heap_truncate(List *relids)
{
List *relations = NIL;
ListCell *cell;
/* Open relations for processing, and grab exclusive access on each */
foreach(cell, relids)
{
Oid rid = lfirst_oid(cell);
Relation rel;
Oid toastrelid;
rel = heap_open(rid, AccessExclusiveLock);
relations = lappend(relations, rel);
/* If there is a toast table, add it to the list too */
toastrelid = rel->rd_rel->reltoastrelid;
if (OidIsValid(toastrelid))
{
rel = heap_open(toastrelid, AccessExclusiveLock);
relations = lappend(relations, rel);
}
}
/* Don't allow truncate on tables that are referenced by foreign keys */
heap_truncate_check_FKs(relations, true);
/* OK to do it */
foreach(cell, relations)
{
Relation rel = lfirst(cell);
/* Truncate the actual file (and discard buffers) */
RelationTruncate(rel, 0);
/* If this relation has indexes, truncate the indexes too */
RelationTruncateIndexes(rel);
/*
* Close the relation, but keep exclusive lock on it until commit.
*/
heap_close(rel, NoLock);
}
}
/*
* heap_truncate_check_FKs
* Check for foreign keys referencing a list of relations that
* are to be truncated, and raise error if there are any
*
* We disallow such FKs (except self-referential ones) since the whole point
* of TRUNCATE is to not scan the individual rows to be thrown away.
*
* This is split out so it can be shared by both implementations of truncate.
* Caller should already hold a suitable lock on the relations.
*
* tempTables is only used to select an appropriate error message.
*/
void
heap_truncate_check_FKs(List *relations, bool tempTables)
{
List *oids = NIL;
List *dependents;
ListCell *cell;
/*
* Build a list of OIDs of the interesting relations.
*
* If a relation has no triggers, then it can neither have FKs nor be
* referenced by a FK from another table, so we can ignore it.
*/
foreach(cell, relations)
{
Relation rel = lfirst(cell);
if (rel->rd_rel->reltriggers != 0)
oids = lappend_oid(oids, RelationGetRelid(rel));
}
/*
* Fast path: if no relation has triggers, none has FKs either.
*/
if (oids == NIL)
return;
/*
2006-10-04 02:30:14 +02:00
* Otherwise, must scan pg_constraint. We make one pass with all the
* relations considered; if this finds nothing, then all is well.
*/
dependents = heap_truncate_find_FKs(oids);
if (dependents == NIL)
return;
/*
* Otherwise we repeat the scan once per relation to identify a particular
* pair of relations to complain about. This is pretty slow, but
* performance shouldn't matter much in a failure path. The reason for
* doing things this way is to ensure that the message produced is not
* dependent on chance row locations within pg_constraint.
*/
foreach(cell, oids)
{
2006-10-04 02:30:14 +02:00
Oid relid = lfirst_oid(cell);
ListCell *cell2;
dependents = heap_truncate_find_FKs(list_make1_oid(relid));
foreach(cell2, dependents)
{
2006-10-04 02:30:14 +02:00
Oid relid2 = lfirst_oid(cell2);
if (!list_member_oid(oids, relid2))
{
2006-10-04 02:30:14 +02:00
char *relname = get_rel_name(relid);
char *relname2 = get_rel_name(relid2);
if (tempTables)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported ON COMMIT and foreign key combination"),
errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
relname2, relname)));
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot truncate a table referenced in a foreign key constraint"),
errdetail("Table \"%s\" references \"%s\".",
relname2, relname),
2006-10-04 02:30:14 +02:00
errhint("Truncate table \"%s\" at the same time, "
"or use TRUNCATE ... CASCADE.",
relname2)));
}
}
}
}
/*
* heap_truncate_find_FKs
* Find relations having foreign keys referencing any of the given rels
*
* Input and result are both lists of relation OIDs. The result contains
* no duplicates, does *not* include any rels that were already in the input
* list, and is sorted in OID order. (The last property is enforced mainly
* to guarantee consistent behavior in the regression tests; we don't want
* behavior to change depending on chance locations of rows in pg_constraint.)
*
* Note: caller should already have appropriate lock on all rels mentioned
2006-10-04 02:30:14 +02:00
* in relationIds. Since adding or dropping an FK requires exclusive lock
* on both rels, this ensures that the answer will be stable.
*/
List *
heap_truncate_find_FKs(List *relationIds)
{
List *result = NIL;
Relation fkeyRel;
SysScanDesc fkeyScan;
HeapTuple tuple;
/*
2006-10-04 02:30:14 +02:00
* Must scan pg_constraint. Right now, it is a seqscan because there is
* no available index on confrelid.
*/
fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
/* Not a foreign key */
if (con->contype != CONSTRAINT_FOREIGN)
continue;
/* Not referencing one of our list of tables */
if (!list_member_oid(relationIds, con->confrelid))
continue;
/* Add referencer unless already in input or result list */
if (!list_member_oid(relationIds, con->conrelid))
result = insert_ordered_unique_oid(result, con->conrelid);
}
systable_endscan(fkeyScan);
heap_close(fkeyRel, AccessShareLock);
return result;
}
/*
* insert_ordered_unique_oid
* Insert a new Oid into a sorted list of Oids, preserving ordering,
* and eliminating duplicates
*
* Building the ordered list this way is O(N^2), but with a pretty small
* constant, so for the number of entries we expect it will probably be
* faster than trying to apply qsort(). It seems unlikely someone would be
* trying to truncate a table with thousands of dependent tables ...
*/
static List *
insert_ordered_unique_oid(List *list, Oid datum)
{
ListCell *prev;
/* Does the datum belong at the front? */
if (list == NIL || datum < linitial_oid(list))
return lcons_oid(datum, list);
/* Does it match the first entry? */
if (datum == linitial_oid(list))
return list; /* duplicate, so don't insert */
/* No, so find the entry it belongs after */
prev = list_head(list);
for (;;)
{
ListCell *curr = lnext(prev);
if (curr == NULL || datum < lfirst_oid(curr))
break; /* it belongs after 'prev', before 'curr' */
if (datum == lfirst_oid(curr))
return list; /* duplicate, so don't insert */
prev = curr;
}
/* Insert datum into list after 'prev' */
lappend_cell_oid(list, prev, datum);
return list;
}