diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml index 17a1d34d08..bba690d523 100644 --- a/doc/src/sgml/ref/alter_table.sgml +++ b/doc/src/sgml/ref/alter_table.sgml @@ -43,6 +43,7 @@ ALTER TABLE name ALTER [ COLUMN ] column RESET ( attribute_option [, ... ] ) ALTER [ COLUMN ] column SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN } ADD table_constraint + ADD table_constraint_using_index DROP CONSTRAINT [ IF EXISTS ] constraint_name [ RESTRICT | CASCADE ] DISABLE TRIGGER [ trigger_name | ALL | USER ] ENABLE TRIGGER [ trigger_name | ALL | USER ] @@ -62,6 +63,12 @@ ALTER TABLE name NO INHERIT parent_table OWNER TO new_owner SET TABLESPACE new_tablespace + +and table_constraint_using_index is: + + [ CONSTRAINT constraint_name ] + { UNIQUE | PRIMARY KEY } USING INDEX index_name + [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] @@ -229,6 +236,57 @@ ALTER TABLE name + + ADD table_constraint_using_index + + + This form adds a new PRIMARY KEY or UNIQUE + constraint to a table based on an existing unique index. All the + columns of the index will be included in the constraint. + + + + The index cannot have expression columns nor be a partial index. + Also, it must be a b-tree index with default sort ordering. These + restrictions ensure that the index is equivalent to one that would be + built by a regular ADD PRIMARY KEY or ADD UNIQUE + command. + + + + If PRIMARY KEY is specified, and the index's columns are not + already marked NOT NULL, then this command will attempt to + do ALTER COLUMN SET NOT NULL against each such column. + That requires a full table scan to verify the column(s) contain no + nulls. In all other cases, this is a fast operation. + + + + If a constraint name is provided then the index will be renamed to match + the constraint name. Otherwise the constraint will be named the same as + the index. + + + + After this command is executed, the index is owned by the + constraint, in the same way as if the index had been built by + a regular ADD PRIMARY KEY or ADD UNIQUE + command. In particular, dropping the constraint will make the index + disappear too. + + + + + Adding a constraint using an existing index can be helpful in + situations where a new constraint needs to be added without blocking + table updates for a long time. To do that, create the index using + CREATE INDEX CONCURRENTLY, and then install it as an + official constraint using this syntax. See the example below. + + + + + DROP CONSTRAINT [ IF EXISTS ] @@ -920,13 +978,24 @@ ALTER TABLE myschema.distributors SET SCHEMA yourschema; + + To recreate a primary key constraint, without blocking updates while the + index is rebuilt: + +CREATE UNIQUE INDEX CONCURRENTLY dist_id_temp_idx on distributors (dist_id); +ALTER TABLE distributors DROP CONSTRAINT distributors_pkey, + ADD CONSTRAINT distributors_pkey PRIMARY KEY USING INDEX dist_id_temp_idx; + + + Compatibility - The forms ADD, DROP, SET DEFAULT, + The forms ADD (without USING INDEX), + DROP, SET DEFAULT, and SET DATA TYPE (without USING) conform with the SQL standard. The other forms are PostgreSQL extensions of the SQL standard. @@ -940,4 +1009,12 @@ ALTER TABLE myschema.distributors SET SCHEMA yourschema; extension of SQL, which disallows zero-column tables. + + + See Also + + + + + diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 7b64a7af44..86fd11b496 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -82,6 +82,7 @@ typedef struct } v_i_state; /* non-export function prototypes */ +static bool relationHasPrimaryKey(Relation rel); static TupleDesc ConstructTupleDescriptor(Relation heapRelation, IndexInfo *indexInfo, List *indexColNames, @@ -117,6 +118,141 @@ static void RemoveReindexPending(Oid indexOid); static void ResetReindexPending(void); +/* + * relationHasPrimaryKey + * See whether an existing relation has a primary key. + * + * Caller must have suitable lock on the relation. + */ +static bool +relationHasPrimaryKey(Relation rel) +{ + bool result = false; + List *indexoidlist; + ListCell *indexoidscan; + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache until we find one marked primary key + * (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary; + ReleaseSysCache(indexTuple); + if (result) + break; + } + + list_free(indexoidlist); + + return result; +} + +/* + * index_check_primary_key + * Apply special checks needed before creating a PRIMARY KEY index + * + * This processing used to be in DefineIndex(), but has been split out + * so that it can be applied during ALTER TABLE ADD PRIMARY KEY USING INDEX. + * + * We check for a pre-existing primary key, and that all columns of the index + * are simple column references (not expressions), and that all those + * columns are marked NOT NULL. If they aren't (which can only happen during + * ALTER TABLE ADD CONSTRAINT, since the parser forces such columns to be + * created NOT NULL during CREATE TABLE), do an ALTER SET NOT NULL to mark + * them so --- or fail if they are not in fact nonnull. + * + * Caller had better have at least ShareLock on the table, else the not-null + * checking isn't trustworthy. + */ +void +index_check_primary_key(Relation heapRel, + IndexInfo *indexInfo, + bool is_alter_table) +{ + List *cmds; + int i; + + /* + * If ALTER TABLE, check that there isn't already a PRIMARY KEY. In + * CREATE TABLE, we have faith that the parser rejected multiple pkey + * clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so + * it's no problem either. + */ + if (is_alter_table && + relationHasPrimaryKey(heapRel)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + RelationGetRelationName(heapRel)))); + } + + /* + * Check that all of the attributes in a primary key are marked as not + * null, otherwise attempt to ALTER TABLE .. SET NOT NULL + */ + cmds = NIL; + for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) + { + AttrNumber attnum = indexInfo->ii_KeyAttrNumbers[i]; + HeapTuple atttuple; + Form_pg_attribute attform; + + if (attnum == 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("primary keys cannot be expressions"))); + + /* System attributes are never null, so no need to check */ + if (attnum < 0) + continue; + + atttuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(RelationGetRelid(heapRel)), + Int16GetDatum(attnum)); + if (!HeapTupleIsValid(atttuple)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + attnum, RelationGetRelid(heapRel)); + attform = (Form_pg_attribute) GETSTRUCT(atttuple); + + if (!attform->attnotnull) + { + /* Add a subcommand to make this one NOT NULL */ + AlterTableCmd *cmd = makeNode(AlterTableCmd); + + cmd->subtype = AT_SetNotNull; + cmd->name = pstrdup(NameStr(attform->attname)); + cmds = lappend(cmds, cmd); + } + + ReleaseSysCache(atttuple); + } + + /* + * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child + * tables? Currently, since the PRIMARY KEY itself doesn't cascade, + * we don't cascade the notnull constraint(s) either; but this is + * pretty debatable. + * + * XXX: possible future improvement: when being called from ALTER + * TABLE, it would be more efficient to merge this with the outer + * ALTER TABLE, so as to avoid two scans. But that seems to + * complicate DefineIndex's API unduly. + */ + if (cmds) + AlterTableInternal(RelationGetRelid(heapRel), cmds, false); +} + /* * ConstructTupleDescriptor * @@ -492,7 +628,7 @@ UpdateIndexRelation(Oid indexoid, /* * index_create * - * heapRelationId: OID of table to build index on + * heapRelation: table to build index on (suitably locked by caller) * indexRelationName: what it say * indexRelationId: normally, pass InvalidOid to let this routine * generate an OID for the index. During bootstrap this may be @@ -505,7 +641,7 @@ UpdateIndexRelation(Oid indexoid, * coloptions: array of per-index-column indoption settings * reloptions: AM-specific options * isprimary: index is a PRIMARY KEY - * isconstraint: index is owned by a PRIMARY KEY or UNIQUE constraint + * isconstraint: index is owned by PRIMARY KEY, UNIQUE, or EXCLUSION constraint * deferrable: constraint is DEFERRABLE * initdeferred: constraint is INITIALLY DEFERRED * allow_system_table_mods: allow table to be a system catalog @@ -518,7 +654,7 @@ UpdateIndexRelation(Oid indexoid, * Returns the OID of the created index. */ Oid -index_create(Oid heapRelationId, +index_create(Relation heapRelation, const char *indexRelationName, Oid indexRelationId, IndexInfo *indexInfo, @@ -536,8 +672,8 @@ index_create(Oid heapRelationId, bool skip_build, bool concurrent) { + Oid heapRelationId = RelationGetRelid(heapRelation); Relation pg_class; - Relation heapRelation; Relation indexRelation; TupleDesc indexTupDesc; bool shared_relation; @@ -551,14 +687,6 @@ index_create(Oid heapRelationId, pg_class = heap_open(RelationRelationId, RowExclusiveLock); - /* - * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard - * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE - * (but not VACUUM). - */ - heapRelation = heap_open(heapRelationId, - (concurrent ? ShareUpdateExclusiveLock : ShareLock)); - /* * The index will be in the same namespace as its parent table, and is * shared across databases if and only if the parent is. Likewise, it @@ -734,9 +862,9 @@ index_create(Oid heapRelationId, * Register constraint and dependencies for the index. * * If the index is from a CONSTRAINT clause, construct a pg_constraint - * entry. The index is then linked to the constraint, which in turn is - * linked to the table. If it's not a CONSTRAINT, make the dependency - * directly on the table. + * entry. The index will be linked to the constraint, which in turn is + * linked to the table. If it's not a CONSTRAINT, we need to make a + * dependency directly on the table. * * We don't need a dependency on the namespace, because there'll be an * indirect dependency via our parent table. @@ -756,7 +884,6 @@ index_create(Oid heapRelationId, if (isconstraint) { char constraintType; - Oid conOid; if (isprimary) constraintType = CONSTRAINT_PRIMARY; @@ -770,77 +897,16 @@ index_create(Oid heapRelationId, constraintType = 0; /* keep compiler quiet */ } - /* primary/unique constraints shouldn't have any expressions */ - if (indexInfo->ii_Expressions && - constraintType != CONSTRAINT_EXCLUSION) - elog(ERROR, "constraints cannot have index expressions"); - - conOid = CreateConstraintEntry(indexRelationName, - namespaceId, - constraintType, - deferrable, - initdeferred, - heapRelationId, - indexInfo->ii_KeyAttrNumbers, - indexInfo->ii_NumIndexAttrs, - InvalidOid, /* no domain */ - indexRelationId, /* index OID */ - InvalidOid, /* no foreign key */ - NULL, - NULL, - NULL, - NULL, - 0, - ' ', - ' ', - ' ', - indexInfo->ii_ExclusionOps, - NULL, /* no check constraint */ - NULL, - NULL, - true, /* islocal */ - 0); /* inhcount */ - - referenced.classId = ConstraintRelationId; - referenced.objectId = conOid; - referenced.objectSubId = 0; - - recordDependencyOn(&myself, &referenced, DEPENDENCY_INTERNAL); - - /* - * If the constraint is deferrable, create the deferred uniqueness - * checking trigger. (The trigger will be given an internal - * dependency on the constraint by CreateTrigger, so there's no - * need to do anything more here.) - */ - if (deferrable) - { - RangeVar *heapRel; - CreateTrigStmt *trigger; - - heapRel = makeRangeVar(get_namespace_name(namespaceId), - pstrdup(RelationGetRelationName(heapRelation)), - -1); - - trigger = makeNode(CreateTrigStmt); - trigger->trigname = (isprimary ? "PK_ConstraintTrigger" : - "Unique_ConstraintTrigger"); - trigger->relation = heapRel; - trigger->funcname = SystemFuncName("unique_key_recheck"); - trigger->args = NIL; - trigger->row = true; - trigger->timing = TRIGGER_TYPE_AFTER; - trigger->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE; - trigger->columns = NIL; - trigger->whenClause = NULL; - trigger->isconstraint = true; - trigger->deferrable = true; - trigger->initdeferred = initdeferred; - trigger->constrrel = NULL; - - (void) CreateTrigger(trigger, NULL, conOid, indexRelationId, - true); - } + index_constraint_create(heapRelation, + indexRelationId, + indexInfo, + indexRelationName, + constraintType, + deferrable, + initdeferred, + false, /* already marked primary */ + false, /* pg_index entry is OK */ + allow_system_table_mods); } else { @@ -970,15 +1036,211 @@ index_create(Oid heapRelationId, } /* - * Close the heap and index; but we keep the locks that we acquired above - * until end of transaction. + * Close the index; but we keep the lock that we acquired above until end + * of transaction. Closing the heap is caller's responsibility. */ index_close(indexRelation, NoLock); - heap_close(heapRelation, NoLock); return indexRelationId; } +/* + * index_constraint_create + * + * Set up a constraint associated with an index + * + * heapRelation: table owning the index (must be suitably locked by caller) + * indexRelationId: OID of the index + * indexInfo: same info executor uses to insert into the index + * constraintName: what it say (generally, should match name of index) + * constraintType: one of CONSTRAINT_PRIMARY, CONSTRAINT_UNIQUE, or + * CONSTRAINT_EXCLUSION + * deferrable: constraint is DEFERRABLE + * initdeferred: constraint is INITIALLY DEFERRED + * mark_as_primary: if true, set flags to mark index as primary key + * update_pgindex: if true, update pg_index row (else caller's done that) + * allow_system_table_mods: allow table to be a system catalog + */ +void +index_constraint_create(Relation heapRelation, + Oid indexRelationId, + IndexInfo *indexInfo, + const char *constraintName, + char constraintType, + bool deferrable, + bool initdeferred, + bool mark_as_primary, + bool update_pgindex, + bool allow_system_table_mods) +{ + Oid namespaceId = RelationGetNamespace(heapRelation); + ObjectAddress myself, + referenced; + Oid conOid; + + /* constraint creation support doesn't work while bootstrapping */ + Assert(!IsBootstrapProcessingMode()); + + /* enforce system-table restriction */ + if (!allow_system_table_mods && + IsSystemRelation(heapRelation) && + IsNormalProcessingMode()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user-defined indexes on system catalog tables are not supported"))); + + /* primary/unique constraints shouldn't have any expressions */ + if (indexInfo->ii_Expressions && + constraintType != CONSTRAINT_EXCLUSION) + elog(ERROR, "constraints cannot have index expressions"); + + /* + * Construct a pg_constraint entry. + */ + conOid = CreateConstraintEntry(constraintName, + namespaceId, + constraintType, + deferrable, + initdeferred, + RelationGetRelid(heapRelation), + indexInfo->ii_KeyAttrNumbers, + indexInfo->ii_NumIndexAttrs, + InvalidOid, /* no domain */ + indexRelationId, /* index OID */ + InvalidOid, /* no foreign key */ + NULL, + NULL, + NULL, + NULL, + 0, + ' ', + ' ', + ' ', + indexInfo->ii_ExclusionOps, + NULL, /* no check constraint */ + NULL, + NULL, + true, /* islocal */ + 0); /* inhcount */ + + /* + * Register the index as internally dependent on the constraint. + * + * Note that the constraint has a dependency on the table, so when this + * path is taken we do not need any direct dependency from the index to + * the table. (But if one exists, no great harm is done, either. So in + * the case where we're manufacturing a constraint for a pre-existing + * index, we don't bother to try to get rid of the existing index->table + * dependency.) + */ + myself.classId = RelationRelationId; + myself.objectId = indexRelationId; + myself.objectSubId = 0; + + referenced.classId = ConstraintRelationId; + referenced.objectId = conOid; + referenced.objectSubId = 0; + + recordDependencyOn(&myself, &referenced, DEPENDENCY_INTERNAL); + + /* + * If the constraint is deferrable, create the deferred uniqueness + * checking trigger. (The trigger will be given an internal + * dependency on the constraint by CreateTrigger.) + */ + if (deferrable) + { + RangeVar *heapRel; + CreateTrigStmt *trigger; + + heapRel = makeRangeVar(get_namespace_name(namespaceId), + pstrdup(RelationGetRelationName(heapRelation)), + -1); + + trigger = makeNode(CreateTrigStmt); + trigger->trigname = (constraintType == CONSTRAINT_PRIMARY) ? + "PK_ConstraintTrigger" : + "Unique_ConstraintTrigger"; + trigger->relation = heapRel; + trigger->funcname = SystemFuncName("unique_key_recheck"); + trigger->args = NIL; + trigger->row = true; + trigger->timing = TRIGGER_TYPE_AFTER; + trigger->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE; + trigger->columns = NIL; + trigger->whenClause = NULL; + trigger->isconstraint = true; + trigger->deferrable = true; + trigger->initdeferred = initdeferred; + trigger->constrrel = NULL; + + (void) CreateTrigger(trigger, NULL, conOid, indexRelationId, true); + } + + /* + * If needed, mark the table as having a primary key. We assume it can't + * have been so marked already, so no need to clear the flag in the other + * case. + * + * Note: this might better be done by callers. We do it here to avoid + * exposing index_update_stats() globally, but that wouldn't be necessary + * if relhaspkey went away. + */ + if (mark_as_primary) + index_update_stats(heapRelation, + true, + true, + false, + InvalidOid, + heapRelation->rd_rel->reltuples); + + /* + * If needed, mark the index as primary and/or deferred in pg_index. + * + * Note: since this is a transactional update, it's unsafe against + * concurrent SnapshotNow scans of pg_index. When making an existing + * index into a constraint, caller must have a table lock that prevents + * concurrent table updates, and there is a risk that concurrent readers + * of the table will miss seeing this index at all. + */ + if (update_pgindex && (mark_as_primary || deferrable)) + { + Relation pg_index; + HeapTuple indexTuple; + Form_pg_index indexForm; + bool dirty = false; + + pg_index = heap_open(IndexRelationId, RowExclusiveLock); + + indexTuple = SearchSysCacheCopy1(INDEXRELID, + ObjectIdGetDatum(indexRelationId)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexRelationId); + indexForm = (Form_pg_index) GETSTRUCT(indexTuple); + + if (mark_as_primary && !indexForm->indisprimary) + { + indexForm->indisprimary = true; + dirty = true; + } + + if (deferrable && indexForm->indimmediate) + { + indexForm->indimmediate = false; + dirty = true; + } + + if (dirty) + { + simple_heap_update(pg_index, &indexTuple->t_self, indexTuple); + CatalogUpdateIndexes(pg_index, indexTuple); + } + + heap_freetuple(indexTuple); + heap_close(pg_index, RowExclusiveLock); + } +} + /* * index_drop * diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c index 142beaeb6e..c4be3a9ae3 100644 --- a/src/backend/catalog/toasting.c +++ b/src/backend/catalog/toasting.c @@ -115,6 +115,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio TupleDesc tupdesc; bool shared_relation; bool mapped_relation; + Relation toast_rel; Relation class_rel; Oid toast_relid; Oid toast_idxid; @@ -229,9 +230,12 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio false); Assert(toast_relid != InvalidOid); - /* make the toast relation visible, else index creation will fail */ + /* make the toast relation visible, else heap_open will fail */ CommandCounterIncrement(); + /* ShareLock is not really needed here, but take it anyway */ + toast_rel = heap_open(toast_relid, ShareLock); + /* * Create unique index on chunk_id, chunk_seq. * @@ -266,7 +270,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio coloptions[0] = 0; coloptions[1] = 0; - toast_idxid = index_create(toast_relid, toast_idxname, toastIndexOid, + toast_idxid = index_create(toast_rel, toast_idxname, toastIndexOid, indexInfo, list_make2("chunk_id", "chunk_seq"), BTREE_AM_OID, @@ -275,6 +279,8 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio true, false, false, false, true, false, false); + heap_close(toast_rel, NoLock); + /* * Store the toast table's OID in the parent relation's pg_class row */ diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 7a6a4c33ad..94ed437002 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -69,7 +69,6 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static char *ChooseIndexNameAddition(List *colnames); -static bool relationHasPrimaryKey(Relation rel); /* @@ -320,92 +319,6 @@ DefineIndex(RangeVar *heapRelation, if (predicate) CheckPredicate(predicate); - /* - * Extra checks when creating a PRIMARY KEY index. - */ - if (primary) - { - List *cmds; - ListCell *keys; - - /* - * If ALTER TABLE, check that there isn't already a PRIMARY KEY. In - * CREATE TABLE, we have faith that the parser rejected multiple pkey - * clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so - * it's no problem either. - */ - if (is_alter_table && - relationHasPrimaryKey(rel)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - RelationGetRelationName(rel)))); - } - - /* - * Check that all of the attributes in a primary key are marked as not - * null, otherwise attempt to ALTER TABLE .. SET NOT NULL - */ - cmds = NIL; - foreach(keys, attributeList) - { - IndexElem *key = (IndexElem *) lfirst(keys); - HeapTuple atttuple; - - if (!key->name) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("primary keys cannot be expressions"))); - - /* System attributes are never null, so no problem */ - if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids)) - continue; - - atttuple = SearchSysCacheAttName(relationId, key->name); - if (HeapTupleIsValid(atttuple)) - { - if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull) - { - /* Add a subcommand to make this one NOT NULL */ - AlterTableCmd *cmd = makeNode(AlterTableCmd); - - cmd->subtype = AT_SetNotNull; - cmd->name = key->name; - - cmds = lappend(cmds, cmd); - } - ReleaseSysCache(atttuple); - } - else - { - /* - * This shouldn't happen during CREATE TABLE, but can happen - * during ALTER TABLE. Keep message in sync with - * transformIndexConstraints() in parser/parse_utilcmd.c. - */ - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" named in key does not exist", - key->name))); - } - } - - /* - * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child - * tables? Currently, since the PRIMARY KEY itself doesn't cascade, - * we don't cascade the notnull constraint(s) either; but this is - * pretty debatable. - * - * XXX: possible future improvement: when being called from ALTER - * TABLE, it would be more efficient to merge this with the outer - * ALTER TABLE, so as to avoid two scans. But that seems to - * complicate DefineIndex's API unduly. - */ - if (cmds) - AlterTableInternal(relationId, cmds, false); - } - /* * Parse AM-specific options, convert to text array form, validate. */ @@ -439,6 +352,12 @@ DefineIndex(RangeVar *heapRelation, accessMethodName, accessMethodId, amcanorder, isconstraint); + /* + * Extra checks when creating a PRIMARY KEY index. + */ + if (primary) + index_check_primary_key(rel, indexInfo, is_alter_table); + /* * Report index creation if appropriate (delay this till after most of the * error checks) @@ -466,17 +385,12 @@ DefineIndex(RangeVar *heapRelation, indexRelationName, RelationGetRelationName(rel)))); } - /* save lockrelid and locktag for below, then close rel */ - heaprelid = rel->rd_lockInfo.lockRelId; - SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId); - heap_close(rel, NoLock); - /* * Make the catalog entries for the index, including constraints. Then, if * not skip_build || concurrent, actually build the index. */ indexRelationId = - index_create(relationId, indexRelationName, indexRelationId, + index_create(rel, indexRelationName, indexRelationId, indexInfo, indexColNames, accessMethodId, tablespaceId, classObjectId, coloptions, reloptions, primary, @@ -486,7 +400,16 @@ DefineIndex(RangeVar *heapRelation, concurrent); if (!concurrent) - return; /* We're done, in the standard case */ + { + /* Close the heap and we're done, in the non-concurrent case */ + heap_close(rel, NoLock); + return; + } + + /* save lockrelid and locktag for below, then close rel */ + heaprelid = rel->rd_lockInfo.lockRelId; + SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId); + heap_close(rel, NoLock); /* * For a concurrent build, it's important to make the catalog entries @@ -1531,44 +1454,6 @@ ChooseIndexColumnNames(List *indexElems) return result; } -/* - * relationHasPrimaryKey - - * - * See whether an existing relation has a primary key. - */ -static bool -relationHasPrimaryKey(Relation rel) -{ - bool result = false; - List *indexoidlist; - ListCell *indexoidscan; - - /* - * Get the list of index OIDs for the table from the relcache, and look up - * each one in the pg_index syscache until we find one marked primary key - * (hopefully there isn't more than one such). - */ - indexoidlist = RelationGetIndexList(rel); - - foreach(indexoidscan, indexoidlist) - { - Oid indexoid = lfirst_oid(indexoidscan); - HeapTuple indexTuple; - - indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); - if (!HeapTupleIsValid(indexTuple)) /* should not happen */ - elog(ERROR, "cache lookup failed for index %u", indexoid); - result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary; - ReleaseSysCache(indexTuple); - if (result) - break; - } - - list_free(indexoidlist); - - return result; -} - /* * ReindexIndex * Recreate a specific index. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a9bb8351bc..6726ca9733 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -319,6 +319,8 @@ static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, static void ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Constraint *newConstraint, bool recurse, LOCKMODE lockmode); +static void ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, LOCKMODE lockmode); static void ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Constraint *constr, @@ -2594,6 +2596,7 @@ AlterTableGetLockLevel(List *cmds) case AT_DisableTrigAll: case AT_DisableTrigUser: case AT_AddIndex: /* from ADD CONSTRAINT */ + case AT_AddIndexConstraint: cmd_lockmode = ShareRowExclusiveLock; break; @@ -2811,6 +2814,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, cmd->subtype = AT_AddConstraintRecurse; pass = AT_PASS_ADD_CONSTR; break; + case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ + ATSimplePermissions(rel, ATT_TABLE); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATSimplePermissions(rel, ATT_TABLE); /* Recursion occurs during execution phase */ @@ -3042,6 +3051,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def, true, lockmode); break; + case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ + ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); + break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false, @@ -5009,6 +5021,76 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, false); } +/* + * ALTER TABLE ADD CONSTRAINT USING INDEX + */ +static void +ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, LOCKMODE lockmode) +{ + Oid index_oid = stmt->indexOid; + Relation indexRel; + char *indexName; + IndexInfo *indexInfo; + char *constraintName; + char constraintType; + + Assert(IsA(stmt, IndexStmt)); + Assert(OidIsValid(index_oid)); + Assert(stmt->isconstraint); + + indexRel = index_open(index_oid, AccessShareLock); + + indexName = pstrdup(RelationGetRelationName(indexRel)); + + indexInfo = BuildIndexInfo(indexRel); + + /* this should have been checked at parse time */ + if (!indexInfo->ii_Unique) + elog(ERROR, "index \"%s\" is not unique", indexName); + + /* + * Determine name to assign to constraint. We require a constraint to + * have the same name as the underlying index; therefore, use the index's + * existing name as the default constraint name, and if the user explicitly + * gives some other name for the constraint, rename the index to match. + */ + constraintName = stmt->idxname; + if (constraintName == NULL) + constraintName = indexName; + else if (strcmp(constraintName, indexName) != 0) + { + ereport(NOTICE, + (errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"", + indexName, constraintName))); + RenameRelation(index_oid, constraintName, OBJECT_INDEX); + } + + /* Extra checks needed if making primary key */ + if (stmt->primary) + index_check_primary_key(rel, indexInfo, true); + + /* Note we currently don't support EXCLUSION constraints here */ + if (stmt->primary) + constraintType = CONSTRAINT_PRIMARY; + else + constraintType = CONSTRAINT_UNIQUE; + + /* Create the catalog entries for the constraint */ + index_constraint_create(rel, + index_oid, + indexInfo, + constraintName, + constraintType, + stmt->deferrable, + stmt->initdeferred, + stmt->primary, + true, + allowSystemTableMods); + + index_close(indexRel, NoLock); +} + /* * ALTER TABLE ADD CONSTRAINT */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index fb9da8342d..662916d210 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -2233,6 +2233,7 @@ _copyConstraint(Constraint *from) COPY_NODE_FIELD(keys); COPY_NODE_FIELD(exclusions); COPY_NODE_FIELD(options); + COPY_STRING_FIELD(indexname); COPY_STRING_FIELD(indexspace); COPY_STRING_FIELD(access_method); COPY_NODE_FIELD(where_clause); @@ -2705,6 +2706,7 @@ _copyIndexStmt(IndexStmt *from) COPY_NODE_FIELD(options); COPY_NODE_FIELD(whereClause); COPY_NODE_FIELD(excludeOpNames); + COPY_SCALAR_FIELD(indexOid); COPY_SCALAR_FIELD(unique); COPY_SCALAR_FIELD(primary); COPY_SCALAR_FIELD(isconstraint); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 2ef1a33bb7..b7dc450447 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1212,6 +1212,7 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b) COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(whereClause); COMPARE_NODE_FIELD(excludeOpNames); + COMPARE_SCALAR_FIELD(indexOid); COMPARE_SCALAR_FIELD(unique); COMPARE_SCALAR_FIELD(primary); COMPARE_SCALAR_FIELD(isconstraint); @@ -2181,6 +2182,7 @@ _equalConstraint(Constraint *a, Constraint *b) COMPARE_NODE_FIELD(keys); COMPARE_NODE_FIELD(exclusions); COMPARE_NODE_FIELD(options); + COMPARE_STRING_FIELD(indexname); COMPARE_STRING_FIELD(indexspace); COMPARE_STRING_FIELD(access_method); COMPARE_NODE_FIELD(where_clause); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 3c2ce10ee5..c8eccce5a7 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1877,6 +1877,7 @@ _outIndexStmt(StringInfo str, IndexStmt *node) WRITE_NODE_FIELD(options); WRITE_NODE_FIELD(whereClause); WRITE_NODE_FIELD(excludeOpNames); + WRITE_OID_FIELD(indexOid); WRITE_BOOL_FIELD(unique); WRITE_BOOL_FIELD(primary); WRITE_BOOL_FIELD(isconstraint); @@ -2474,6 +2475,7 @@ _outConstraint(StringInfo str, Constraint *node) appendStringInfo(str, "PRIMARY_KEY"); WRITE_NODE_FIELD(keys); WRITE_NODE_FIELD(options); + WRITE_STRING_FIELD(indexname); WRITE_STRING_FIELD(indexspace); /* access_method and where_clause not currently used */ break; @@ -2482,6 +2484,7 @@ _outConstraint(StringInfo str, Constraint *node) appendStringInfo(str, "UNIQUE"); WRITE_NODE_FIELD(keys); WRITE_NODE_FIELD(options); + WRITE_STRING_FIELD(indexname); WRITE_STRING_FIELD(indexspace); /* access_method and where_clause not currently used */ break; @@ -2490,6 +2493,7 @@ _outConstraint(StringInfo str, Constraint *node) appendStringInfo(str, "EXCLUSION"); WRITE_NODE_FIELD(exclusions); WRITE_NODE_FIELD(options); + WRITE_STRING_FIELD(indexname); WRITE_STRING_FIELD(indexspace); WRITE_STRING_FIELD(access_method); WRITE_NODE_FIELD(where_clause); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 660947c8f8..456db5c50e 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -422,6 +422,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_ %type key_actions key_delete key_match key_update key_action %type ConstraintAttributeSpec ConstraintDeferrabilitySpec ConstraintTimeSpec +%type ExistingIndex %type constraints_set_list %type constraints_set_mode @@ -2501,6 +2502,7 @@ ColConstraintElem: n->location = @1; n->keys = NULL; n->options = $2; + n->indexname = NULL; n->indexspace = $3; $$ = (Node *)n; } @@ -2511,6 +2513,7 @@ ColConstraintElem: n->location = @1; n->keys = NULL; n->options = $3; + n->indexname = NULL; n->indexspace = $4; $$ = (Node *)n; } @@ -2665,11 +2668,25 @@ ConstraintElem: n->location = @1; n->keys = $3; n->options = $5; + n->indexname = NULL; n->indexspace = $6; n->deferrable = ($7 & 1) != 0; n->initdeferred = ($7 & 2) != 0; $$ = (Node *)n; } + | UNIQUE ExistingIndex ConstraintAttributeSpec + { + Constraint *n = makeNode(Constraint); + n->contype = CONSTR_UNIQUE; + n->location = @1; + n->keys = NIL; + n->options = NIL; + n->indexname = $2; + n->indexspace = NULL; + n->deferrable = ($3 & 1) != 0; + n->initdeferred = ($3 & 2) != 0; + $$ = (Node *)n; + } | PRIMARY KEY '(' columnList ')' opt_definition OptConsTableSpace ConstraintAttributeSpec { @@ -2678,11 +2695,25 @@ ConstraintElem: n->location = @1; n->keys = $4; n->options = $6; + n->indexname = NULL; n->indexspace = $7; n->deferrable = ($8 & 1) != 0; n->initdeferred = ($8 & 2) != 0; $$ = (Node *)n; } + | PRIMARY KEY ExistingIndex ConstraintAttributeSpec + { + Constraint *n = makeNode(Constraint); + n->contype = CONSTR_PRIMARY; + n->location = @1; + n->keys = NIL; + n->options = NIL; + n->indexname = $3; + n->indexspace = NULL; + n->deferrable = ($4 & 1) != 0; + n->initdeferred = ($4 & 2) != 0; + $$ = (Node *)n; + } | EXCLUDE access_method_clause '(' ExclusionConstraintList ')' opt_definition OptConsTableSpace ExclusionWhereClause ConstraintAttributeSpec @@ -2693,6 +2724,7 @@ ConstraintElem: n->access_method = $2; n->exclusions = $4; n->options = $6; + n->indexname = NULL; n->indexspace = $7; n->where_clause = $8; n->deferrable = ($9 & 1) != 0; @@ -2837,6 +2869,9 @@ OptConsTableSpace: USING INDEX TABLESPACE name { $$ = $4; } | /*EMPTY*/ { $$ = NULL; } ; +ExistingIndex: USING INDEX index_name { $$ = $3; } + ; + /* * Note: CREATE TABLE ... AS SELECT ... is just another spelling for @@ -5230,6 +5265,7 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name n->options = $12; n->tableSpace = $13; n->whereClause = $14; + n->indexOid = InvalidOid; $$ = (Node *)n; } ; diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 23c60eec31..10f52954c1 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -65,6 +65,7 @@ /* State shared by transformCreateStmt and its subroutines */ typedef struct { + ParseState *pstate; /* overall parser state */ const char *stmtType; /* "CREATE [FOREIGN] TABLE" or "ALTER TABLE" */ RangeVar *relation; /* relation to create */ Relation rel; /* opened/locked rel, if ALTER */ @@ -98,30 +99,27 @@ typedef struct } CreateSchemaStmtContext; -static void transformColumnDefinition(ParseState *pstate, - CreateStmtContext *cxt, +static void transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column); -static void transformTableConstraint(ParseState *pstate, - CreateStmtContext *cxt, +static void transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint); -static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, +static void transformInhRelation(CreateStmtContext *cxt, InhRelation *inhrelation); -static void transformOfType(ParseState *pstate, CreateStmtContext *cxt, +static void transformOfType(CreateStmtContext *cxt, TypeName *ofTypename); static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, Relation parent_index, AttrNumber *attmap); static List *get_opclass(Oid opclass, Oid actual_datatype); -static void transformIndexConstraints(ParseState *pstate, - CreateStmtContext *cxt); +static void transformIndexConstraints(CreateStmtContext *cxt); static IndexStmt *transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt); -static void transformFKConstraints(ParseState *pstate, - CreateStmtContext *cxt, +static void transformFKConstraints(CreateStmtContext *cxt, bool skipValidation, bool isAddConstraint); -static void transformConstraintAttrs(ParseState *pstate, List *constraintList); -static void transformColumnType(ParseState *pstate, ColumnDef *column); +static void transformConstraintAttrs(CreateStmtContext *cxt, + List *constraintList); +static void transformColumnType(CreateStmtContext *cxt, ColumnDef *column); static void setSchemaName(char *context_schema, char **stmt_schema_name); @@ -169,10 +167,11 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) stmt->relation->schemaname = get_namespace_name(namespaceid); } - /* Set up pstate */ + /* Set up pstate and CreateStmtContext */ pstate = make_parsestate(NULL); pstate->p_sourcetext = queryString; + cxt.pstate = pstate; if (IsA(stmt, CreateForeignTableStmt)) cxt.stmtType = "CREATE FOREIGN TABLE"; else @@ -194,7 +193,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) Assert(!stmt->ofTypename || !stmt->inhRelations); /* grammar enforces */ if (stmt->ofTypename) - transformOfType(pstate, &cxt, stmt->ofTypename); + transformOfType(&cxt, stmt->ofTypename); /* * Run through each primary element in the table creation clause. Separate @@ -207,18 +206,15 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) switch (nodeTag(element)) { case T_ColumnDef: - transformColumnDefinition(pstate, &cxt, - (ColumnDef *) element); + transformColumnDefinition(&cxt, (ColumnDef *) element); break; case T_Constraint: - transformTableConstraint(pstate, &cxt, - (Constraint *) element); + transformTableConstraint(&cxt, (Constraint *) element); break; case T_InhRelation: - transformInhRelation(pstate, &cxt, - (InhRelation *) element); + transformInhRelation(&cxt, (InhRelation *) element); break; default: @@ -240,12 +236,12 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) /* * Postprocess constraints that give rise to index definitions. */ - transformIndexConstraints(pstate, &cxt); + transformIndexConstraints(&cxt); /* * Postprocess foreign-key constraints. */ - transformFKConstraints(pstate, &cxt, true, false); + transformFKConstraints(&cxt, true, false); /* * Output results. @@ -266,8 +262,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) * Also used in ALTER TABLE ADD COLUMN */ static void -transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, - ColumnDef *column) +transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) { bool is_serial; bool saw_nullable; @@ -309,12 +304,13 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("array of serial is not implemented"), - parser_errposition(pstate, column->typeName->location))); + parser_errposition(cxt->pstate, + column->typeName->location))); } /* Do necessary work on the column type declaration */ if (column->typeName) - transformColumnType(pstate, column); + transformColumnType(cxt, column); /* Special actions for SERIAL pseudo-types */ if (is_serial) @@ -437,7 +433,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, } /* Process column constraints, if any... */ - transformConstraintAttrs(pstate, column->constraints); + transformConstraintAttrs(cxt, column->constraints); saw_nullable = false; saw_default = false; @@ -455,7 +451,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", column->colname, cxt->relation->relname), - parser_errposition(pstate, + parser_errposition(cxt->pstate, constraint->location))); column->is_not_null = FALSE; saw_nullable = true; @@ -467,7 +463,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"", column->colname, cxt->relation->relname), - parser_errposition(pstate, + parser_errposition(cxt->pstate, constraint->location))); column->is_not_null = TRUE; saw_nullable = true; @@ -479,7 +475,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple default values specified for column \"%s\" of table \"%s\"", column->colname, cxt->relation->relname), - parser_errposition(pstate, + parser_errposition(cxt->pstate, constraint->location))); column->raw_default = constraint->raw_expr; Assert(constraint->cooked_expr == NULL); @@ -532,8 +528,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, * transform a Constraint node within CREATE TABLE or ALTER TABLE */ static void -transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt, - Constraint *constraint) +transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) { switch (constraint->contype) { @@ -577,8 +572,7 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt, * . */ static void -transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, - InhRelation *inhRelation) +transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation) { AttrNumber parent_attno; Relation relation; @@ -587,7 +581,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, AclResult aclresult; char *comment; - relation = parserOpenTable(pstate, inhRelation->relation, AccessShareLock); + relation = parserOpenTable(cxt->pstate, inhRelation->relation, + AccessShareLock); if (relation->rd_rel->relkind != RELKIND_RELATION) ereport(ERROR, @@ -816,7 +811,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } static void -transformOfType(ParseState *pstate, CreateStmtContext *cxt, TypeName *ofTypename) +transformOfType(CreateStmtContext *cxt, TypeName *ofTypename) { HeapTuple tuple; Form_pg_type typ; @@ -937,6 +932,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, index->tableSpace = get_tablespace_name(idxrelrec->reltablespace); else index->tableSpace = NULL; + index->indexOid = InvalidOid; index->unique = idxrec->indisunique; index->primary = idxrec->indisprimary; index->concurrent = false; @@ -1181,7 +1177,7 @@ get_opclass(Oid opclass, Oid actual_datatype) * LIKE ... INCLUDING INDEXES. */ static void -transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) +transformIndexConstraints(CreateStmtContext *cxt) { IndexStmt *index; List *indexlist = NIL; @@ -1304,7 +1300,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); + cxt->relation->relname), + parser_errposition(cxt->pstate, constraint->location))); cxt->pkey = index; /* @@ -1328,8 +1325,182 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) index->whereClause = constraint->where_clause; index->indexParams = NIL; index->excludeOpNames = NIL; + index->indexOid = InvalidOid; index->concurrent = false; + /* + * If it's ALTER TABLE ADD CONSTRAINT USING INDEX, look up the index and + * verify it's usable, then extract the implied column name list. (We + * will not actually need the column name list at runtime, but we need + * it now to check for duplicate column entries below.) + */ + if (constraint->indexname != NULL) + { + char *index_name = constraint->indexname; + Relation heap_rel = cxt->rel; + Oid index_oid; + Relation index_rel; + Form_pg_index index_form; + oidvector *indclass; + Datum indclassDatum; + bool isnull; + int i; + + /* Grammar should not allow this with explicit column list */ + Assert(constraint->keys == NIL); + + /* Grammar should only allow PRIMARY and UNIQUE constraints */ + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + + /* Must be ALTER, not CREATE, but grammar doesn't enforce that */ + if (!cxt->isalter) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use an existing index in CREATE TABLE"), + parser_errposition(cxt->pstate, constraint->location))); + + /* Look for the index in the same schema as the table */ + index_oid = get_relname_relid(index_name, RelationGetNamespace(heap_rel)); + + if (!OidIsValid(index_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("index \"%s\" does not exist", index_name), + parser_errposition(cxt->pstate, constraint->location))); + + /* Open the index (this will throw an error if it is not an index) */ + index_rel = index_open(index_oid, AccessShareLock); + index_form = index_rel->rd_index; + + /* Check that it does not have an associated constraint already */ + if (OidIsValid(get_index_constraint(index_oid))) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" is already associated with a constraint", + index_name), + parser_errposition(cxt->pstate, constraint->location))); + + /* Perform validity checks on the index */ + if (index_form->indrelid != RelationGetRelid(heap_rel)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" does not belong to table \"%s\"", + index_name, RelationGetRelationName(heap_rel)), + parser_errposition(cxt->pstate, constraint->location))); + + if (!index_form->indisvalid) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" is not valid", index_name), + parser_errposition(cxt->pstate, constraint->location))); + + if (!index_form->indisready) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("index \"%s\" is not ready", index_name), + parser_errposition(cxt->pstate, constraint->location))); + + if (!index_form->indisunique) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a unique index", index_name), + errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."), + parser_errposition(cxt->pstate, constraint->location))); + + if (RelationGetIndexExpressions(index_rel) != NIL) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("index \"%s\" contains expressions", index_name), + errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."), + parser_errposition(cxt->pstate, constraint->location))); + + if (RelationGetIndexPredicate(index_rel) != NIL) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a partial index", index_name), + errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."), + parser_errposition(cxt->pstate, constraint->location))); + + /* + * It's probably unsafe to change a deferred index to non-deferred. + * (A non-constraint index couldn't be deferred anyway, so this case + * should never occur; no need to sweat, but let's check it.) + */ + if (!index_form->indimmediate && !constraint->deferrable) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a deferrable index", index_name), + errdetail("Cannot create a non-deferrable constraint using a deferrable index."), + parser_errposition(cxt->pstate, constraint->location))); + + /* + * Insist on it being a btree. That's the only kind that supports + * uniqueness at the moment anyway; but we must have an index that + * exactly matches what you'd get from plain ADD CONSTRAINT syntax, + * else dump and reload will produce a different index (breaking + * pg_upgrade in particular). + */ + if (index_rel->rd_rel->relam != get_am_oid(DEFAULT_INDEX_TYPE, false)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("index \"%s\" is not a b-tree", index_name), + parser_errposition(cxt->pstate, constraint->location))); + + /* Must get indclass the hard way */ + indclassDatum = SysCacheGetAttr(INDEXRELID, index_rel->rd_indextuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + for (i = 0; i < index_form->indnatts; i++) + { + int2 attnum = index_form->indkey.values[i]; + Form_pg_attribute attform; + char *attname; + Oid defopclass; + + /* + * We shouldn't see attnum == 0 here, since we already rejected + * expression indexes. If we do, SystemAttributeDefinition + * will throw an error. + */ + if (attnum > 0) + { + Assert(attnum <= heap_rel->rd_att->natts); + attform = heap_rel->rd_att->attrs[attnum - 1]; + } + else + attform = SystemAttributeDefinition(attnum, + heap_rel->rd_rel->relhasoids); + attname = pstrdup(NameStr(attform->attname)); + + /* + * Insist on default opclass and sort options. While the index + * would still work as a constraint with non-default settings, it + * might not provide exactly the same uniqueness semantics as + * you'd get from a normally-created constraint; and there's also + * the dump/reload problem mentioned above. + */ + defopclass = GetDefaultOpClass(attform->atttypid, + index_rel->rd_rel->relam); + if (indclass->values[i] != defopclass || + index_rel->rd_indoption[i] != 0) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("index \"%s\" does not have default sorting behavior", index_name), + errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."), + parser_errposition(cxt->pstate, constraint->location))); + + constraint->keys = lappend(constraint->keys, makeString(attname)); + } + + /* Close the index relation but keep the lock */ + relation_close(index_rel, NoLock); + + index->indexOid = index_oid; + } + /* * If it's an EXCLUDE constraint, the grammar returns a list of pairs of * IndexElems and operator names. We have to break that apart into @@ -1450,8 +1621,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) if (!found && !cxt->isalter) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" named in key does not exist", - key))); + errmsg("column \"%s\" named in key does not exist", key), + parser_errposition(cxt->pstate, constraint->location))); /* Check for PRIMARY KEY(foo, foo) */ foreach(columns, index->indexParams) @@ -1463,12 +1634,14 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), errmsg("column \"%s\" appears twice in primary key constraint", - key))); + key), + parser_errposition(cxt->pstate, constraint->location))); else ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), errmsg("column \"%s\" appears twice in unique constraint", - key))); + key), + parser_errposition(cxt->pstate, constraint->location))); } } @@ -1491,7 +1664,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) * handle FOREIGN KEY constraints */ static void -transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, +transformFKConstraints(CreateStmtContext *cxt, bool skipValidation, bool isAddConstraint) { ListCell *fkclist; @@ -1978,7 +2151,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) stmt = (AlterTableStmt *) copyObject(stmt); /* - * Assign the appropriate lock level for this list of subcommands. + * Determine the appropriate lock level for this list of subcommands. */ lockmode = AlterTableGetLockLevel(stmt->cmds); @@ -1992,10 +2165,11 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) */ rel = relation_openrv(stmt->relation, lockmode); - /* Set up pstate */ + /* Set up pstate and CreateStmtContext */ pstate = make_parsestate(NULL); pstate->p_sourcetext = queryString; + cxt.pstate = pstate; cxt.stmtType = "ALTER TABLE"; cxt.relation = stmt->relation; cxt.rel = rel; @@ -2028,7 +2202,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) ColumnDef *def = (ColumnDef *) cmd->def; Assert(IsA(def, ColumnDef)); - transformColumnDefinition(pstate, &cxt, def); + transformColumnDefinition(&cxt, def); /* * If the column has a non-null default, we can't skip @@ -2053,8 +2227,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) */ if (IsA(cmd->def, Constraint)) { - transformTableConstraint(pstate, &cxt, - (Constraint *) cmd->def); + transformTableConstraint(&cxt, (Constraint *) cmd->def); if (((Constraint *) cmd->def)->contype == CONSTR_FOREIGN) skipValidation = false; } @@ -2088,25 +2261,25 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) cxt.alist = NIL; /* Postprocess index and FK constraints */ - transformIndexConstraints(pstate, &cxt); + transformIndexConstraints(&cxt); - transformFKConstraints(pstate, &cxt, skipValidation, true); + transformFKConstraints(&cxt, skipValidation, true); /* * Push any index-creation commands into the ALTER, so that they can be * scheduled nicely by tablecmds.c. Note that tablecmds.c assumes that - * the IndexStmt attached to an AT_AddIndex subcommand has already been - * through transformIndexStmt. + * the IndexStmt attached to an AT_AddIndex or AT_AddIndexConstraint + * subcommand has already been through transformIndexStmt. */ foreach(l, cxt.alist) { - Node *idxstmt = (Node *) lfirst(l); + IndexStmt *idxstmt = (IndexStmt *) lfirst(l); Assert(IsA(idxstmt, IndexStmt)); + idxstmt = transformIndexStmt(idxstmt, queryString); newcmd = makeNode(AlterTableCmd); - newcmd->subtype = AT_AddIndex; - newcmd->def = (Node *) transformIndexStmt((IndexStmt *) idxstmt, - queryString); + newcmd->subtype = OidIsValid(idxstmt->indexOid) ? AT_AddIndexConstraint : AT_AddIndex; + newcmd->def = (Node *) idxstmt; newcmds = lappend(newcmds, newcmd); } cxt.alist = NIL; @@ -2153,7 +2326,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) * for other constraint types. */ static void -transformConstraintAttrs(ParseState *pstate, List *constraintList) +transformConstraintAttrs(CreateStmtContext *cxt, List *constraintList) { Constraint *lastprimarycon = NULL; bool saw_deferrability = false; @@ -2181,12 +2354,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("misplaced DEFERRABLE clause"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); if (saw_deferrability) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); saw_deferrability = true; lastprimarycon->deferrable = true; break; @@ -2196,12 +2369,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("misplaced NOT DEFERRABLE clause"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); if (saw_deferrability) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); saw_deferrability = true; lastprimarycon->deferrable = false; if (saw_initially && @@ -2209,7 +2382,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); break; case CONSTR_ATTR_DEFERRED: @@ -2217,12 +2390,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("misplaced INITIALLY DEFERRED clause"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); if (saw_initially) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); saw_initially = true; lastprimarycon->initdeferred = true; @@ -2235,7 +2408,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); break; case CONSTR_ATTR_IMMEDIATE: @@ -2243,12 +2416,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("misplaced INITIALLY IMMEDIATE clause"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); if (saw_initially) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"), - parser_errposition(pstate, con->location))); + parser_errposition(cxt->pstate, con->location))); saw_initially = true; lastprimarycon->initdeferred = false; break; @@ -2268,12 +2441,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList) * Special handling of type definition for a column */ static void -transformColumnType(ParseState *pstate, ColumnDef *column) +transformColumnType(CreateStmtContext *cxt, ColumnDef *column) { /* * All we really need to do here is verify that the type is valid. */ - Type ctype = typenameType(pstate, column->typeName, NULL); + Type ctype = typenameType(cxt->pstate, column->typeName, NULL); ReleaseSysCache(ctype); } diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 29c471703c..60387cca01 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -28,7 +28,11 @@ typedef void (*IndexBuildCallback) (Relation index, void *state); -extern Oid index_create(Oid heapRelationId, +extern void index_check_primary_key(Relation heapRel, + IndexInfo *indexInfo, + bool is_alter_table); + +extern Oid index_create(Relation heapRelation, const char *indexRelationName, Oid indexRelationId, IndexInfo *indexInfo, @@ -46,6 +50,17 @@ extern Oid index_create(Oid heapRelationId, bool skip_build, bool concurrent); +extern void index_constraint_create(Relation heapRelation, + Oid indexRelationId, + IndexInfo *indexInfo, + const char *constraintName, + char constraintType, + bool deferrable, + bool initdeferred, + bool mark_as_primary, + bool update_pgindex, + bool allow_system_table_mods); + extern void index_drop(Oid indexId); extern IndexInfo *BuildIndexInfo(Relation index); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3d2ae991b7..483f22591e 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1142,6 +1142,7 @@ typedef enum AlterTableType AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ProcessedConstraint, /* pre-processed add constraint (local in * parser/parse_utilcmd.c) */ + AT_AddIndexConstraint, /* add constraint using existing index */ AT_DropConstraint, /* drop constraint */ AT_DropConstraintRecurse, /* internal to commands/tablecmds.c */ AT_AlterColumnType, /* alter column type */ @@ -1477,6 +1478,7 @@ typedef struct Constraint /* Fields used for index constraints (UNIQUE, PRIMARY KEY, EXCLUSION): */ List *options; /* options from WITH clause */ + char *indexname; /* existing index to use; otherwise NULL */ char *indexspace; /* index tablespace; NULL for default */ /* These could be, but currently are not, used for UNIQUE/PKEY: */ char *access_method; /* index access method; NULL for default */ @@ -1953,6 +1955,12 @@ typedef struct FetchStmt /* ---------------------- * Create Index Statement + * + * This represents creation of an index and/or an associated constraint. + * If indexOid isn't InvalidOid, we are not creating an index, just a + * UNIQUE/PKEY constraint using an existing index. isconstraint must always + * be true in this case, and the fields describing the index properties are + * empty. * ---------------------- */ typedef struct IndexStmt @@ -1966,6 +1974,7 @@ typedef struct IndexStmt List *options; /* options from WITH clause */ Node *whereClause; /* qualification (partial-index predicate) */ List *excludeOpNames; /* exclusion operator names, or NIL if none */ + Oid indexOid; /* OID of an existing index, if any */ bool unique; /* is index unique? */ bool primary; /* is index on primary key? */ bool isconstraint; /* is it from a CONSTRAINT clause? */ diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out index d586f69ebb..c78d9ee1e8 100644 --- a/src/test/regress/expected/create_index.out +++ b/src/test/regress/expected/create_index.out @@ -1210,6 +1210,43 @@ Indexes: DROP TABLE concur_heap; -- +-- Test ADD CONSTRAINT USING INDEX +-- +CREATE TABLE cwi_test( a int , b varchar(10), c char); +-- add some data so that all tests have something to work with. +INSERT INTO cwi_test VALUES(1, 2), (3, 4), (5, 6); +CREATE UNIQUE INDEX cwi_uniq_idx ON cwi_test(a , b); +ALTER TABLE cwi_test ADD primary key USING INDEX cwi_uniq_idx; +\d cwi_test + Table "public.cwi_test" + Column | Type | Modifiers +--------+-----------------------+----------- + a | integer | not null + b | character varying(10) | not null + c | character(1) | +Indexes: + "cwi_uniq_idx" PRIMARY KEY, btree (a, b) + +CREATE UNIQUE INDEX cwi_uniq2_idx ON cwi_test(b , a); +ALTER TABLE cwi_test DROP CONSTRAINT cwi_uniq_idx, + ADD CONSTRAINT cwi_replaced_pkey PRIMARY KEY + USING INDEX cwi_uniq2_idx; +NOTICE: ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index "cwi_uniq2_idx" to "cwi_replaced_pkey" +\d cwi_test + Table "public.cwi_test" + Column | Type | Modifiers +--------+-----------------------+----------- + a | integer | not null + b | character varying(10) | not null + c | character(1) | +Indexes: + "cwi_replaced_pkey" PRIMARY KEY, btree (b, a) + +DROP INDEX cwi_replaced_pkey; -- Should fail; a constraint depends on it +ERROR: cannot drop index cwi_replaced_pkey because constraint cwi_replaced_pkey on table cwi_test requires it +HINT: You can drop constraint cwi_replaced_pkey on table cwi_test instead. +DROP TABLE cwi_test; +-- -- Tests for IS NULL/IS NOT NULL with b-tree indexes -- SELECT unique1, unique2 INTO onek_with_null FROM onek; diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql index 97c1beb358..31b49ca227 100644 --- a/src/test/regress/sql/create_index.sql +++ b/src/test/regress/sql/create_index.sql @@ -409,6 +409,32 @@ COMMIT; DROP TABLE concur_heap; +-- +-- Test ADD CONSTRAINT USING INDEX +-- + +CREATE TABLE cwi_test( a int , b varchar(10), c char); + +-- add some data so that all tests have something to work with. + +INSERT INTO cwi_test VALUES(1, 2), (3, 4), (5, 6); + +CREATE UNIQUE INDEX cwi_uniq_idx ON cwi_test(a , b); +ALTER TABLE cwi_test ADD primary key USING INDEX cwi_uniq_idx; + +\d cwi_test + +CREATE UNIQUE INDEX cwi_uniq2_idx ON cwi_test(b , a); +ALTER TABLE cwi_test DROP CONSTRAINT cwi_uniq_idx, + ADD CONSTRAINT cwi_replaced_pkey PRIMARY KEY + USING INDEX cwi_uniq2_idx; + +\d cwi_test + +DROP INDEX cwi_replaced_pkey; -- Should fail; a constraint depends on it + +DROP TABLE cwi_test; + -- -- Tests for IS NULL/IS NOT NULL with b-tree indexes --