diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index ea5e4f67e9..4fd887acde 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -822,44 +822,75 @@ DefineIndex(Oid relationId, /* * If this table is partitioned and we're creating a unique index or a - * primary key, make sure that the indexed columns are part of the - * partition key. Otherwise it would be possible to violate uniqueness by - * putting values that ought to be unique in different partitions. + * primary key, make sure that the partition key is a subset of the + * index's columns. Otherwise it would be possible to violate uniqueness + * by putting values that ought to be unique in different partitions. * * We could lift this limitation if we had global indexes, but those have * their own problems, so this is a useful feature combination. */ if (partitioned && (stmt->unique || stmt->primary)) { - PartitionKey key = rel->rd_partkey; + PartitionKey key = RelationGetPartitionKey(rel); + const char *constraint_type; int i; + if (stmt->primary) + constraint_type = "PRIMARY KEY"; + else if (stmt->unique) + constraint_type = "UNIQUE"; + else if (stmt->excludeOpNames != NIL) + constraint_type = "EXCLUDE"; + else + { + elog(ERROR, "unknown constraint type"); + constraint_type = NULL; /* keep compiler quiet */ + } + /* - * A partitioned table can have unique indexes, as long as all the - * columns in the partition key appear in the unique key. A - * partition-local index can enforce global uniqueness iff the PK - * value completely determines the partition that a row is in. - * - * Thus, verify that all the columns in the partition key appear in - * the unique key definition. + * Verify that all the columns in the partition key appear in the + * unique key definition, with the same notion of equality. */ for (i = 0; i < key->partnatts; i++) { bool found = false; + int eq_strategy; + Oid ptkey_eqop; int j; - const char *constraint_type; - if (stmt->primary) - constraint_type = "PRIMARY KEY"; - else if (stmt->unique) - constraint_type = "UNIQUE"; - else if (stmt->excludeOpNames != NIL) - constraint_type = "EXCLUDE"; + /* + * Identify the equality operator associated with this partkey + * column. For list and range partitioning, partkeys use btree + * operator classes; hash partitioning uses hash operator classes. + * (Keep this in sync with ComputePartitionAttrs!) + */ + if (key->strategy == PARTITION_STRATEGY_HASH) + eq_strategy = HTEqualStrategyNumber; else - { - elog(ERROR, "unknown constraint type"); - constraint_type = NULL; /* keep compiler quiet */ - } + eq_strategy = BTEqualStrategyNumber; + + ptkey_eqop = get_opfamily_member(key->partopfamily[i], + key->partopcintype[i], + key->partopcintype[i], + eq_strategy); + if (!OidIsValid(ptkey_eqop)) + elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u", + eq_strategy, key->partopcintype[i], key->partopcintype[i], + key->partopfamily[i]); + + /* + * We'll need to be able to identify the equality operators + * associated with index columns, too. We know what to do with + * btree opclasses; if there are ever any other index types that + * support unique indexes, this logic will need extension. + */ + if (accessMethodId == BTREE_AM_OID) + eq_strategy = BTEqualStrategyNumber; + else + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot match partition key to an index using access method \"%s\"", + accessMethodName))); /* * It may be possible to support UNIQUE constraints when partition @@ -873,19 +904,40 @@ DefineIndex(Oid relationId, errdetail("%s constraints cannot be used when partition keys include expressions.", constraint_type))); + /* Search the index column(s) for a match */ for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++) { if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j]) { - found = true; - break; + /* Matched the column, now what about the equality op? */ + Oid idx_opfamily; + Oid idx_opcintype; + + if (get_opclass_opfamily_and_input_type(classObjectId[j], + &idx_opfamily, + &idx_opcintype)) + { + Oid idx_eqop; + + idx_eqop = get_opfamily_member(idx_opfamily, + idx_opcintype, + idx_opcintype, + eq_strategy); + if (ptkey_eqop == idx_eqop) + { + found = true; + break; + } + } } } + if (!found) { Form_pg_attribute att; - att = TupleDescAttr(RelationGetDescr(rel), key->partattrs[i] - 1); + att = TupleDescAttr(RelationGetDescr(rel), + key->partattrs[i] - 1); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("insufficient columns in %s constraint definition",