Partial code review for ALTER DOMAIN patch. Incorporates Rod Taylor's

patches of 9-Dec (permissions fix) and 13-Dec (performance) as well as
a partial fix for locking issues: concurrent DROP COLUMN should not
create trouble anymore.  But concurrent DROP TABLE is still a risk, and
there is no protection at all against creating a column of a domain while
we are altering the domain.
This commit is contained in:
Tom Lane 2003-01-04 00:46:08 +00:00
parent 150ffb2d50
commit 17194f4112
2 changed files with 190 additions and 156 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.25 2002/12/15 16:17:43 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/typecmds.c,v 1.26 2003/01/04 00:46:08 tgl Exp $
*
* DESCRIPTION
* The "DefineFoo" routines take the parse tree and pick out the
@ -39,6 +39,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
@ -59,20 +60,25 @@
#include "utils/lsyscache.h"
#include "utils/syscache.h"
/* result structure for get_rels_with_domain() */
typedef struct
{
Relation rel; /* opened and locked relation */
int natts; /* number of attributes of interest */
int *atts; /* attribute numbers */
/* atts[] is of allocated length RelationGetNumberOfAttributes(rel) */
} RelToCheck;
static Oid findTypeIOFunction(List *procname, Oid typeOid, bool isOutput);
static List *get_rels_with_domain(Oid domainOid);
static void domainPermissionCheck(HeapTuple tup, TypeName *typename);
static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode);
static void domainOwnerCheck(HeapTuple tup, TypeName *typename);
static char *domainAddConstraint(Oid domainOid, Oid domainNamespace,
Oid baseTypeOid,
int typMod, Constraint *constr,
int *counter, char *domainName);
typedef struct
{
Oid relOid;
int natts;
int *atts;
} relToCheck;
/*
* DefineType
@ -942,7 +948,7 @@ AlterDomainDefault(List *names, Node *defaultRaw)
TypeNameToString(typename));
/* Doesn't return if user isn't allowed to alter the domain */
domainPermissionCheck(tup, typename);
domainOwnerCheck(tup, typename);
/* Setup new tuple */
MemSet(new_record, (Datum) 0, sizeof(new_record));
@ -1029,12 +1035,8 @@ AlterDomainNotNull(List *names, bool notNull)
{
TypeName *typename;
Oid domainoid;
Relation typrel;
HeapTuple tup;
Relation rel;
Datum new_record[Natts_pg_type];
char new_record_nulls[Natts_pg_type];
char new_record_repl[Natts_pg_type];
HeapTuple newtuple;
Form_pg_type typTup;
/* Make a TypeName so we can use standard type lookup machinery */
@ -1044,9 +1046,9 @@ AlterDomainNotNull(List *names, bool notNull)
typename->arrayBounds = NIL;
/* Lock the type table */
rel = heap_openr(TypeRelationName, RowExclusiveLock);
typrel = heap_openr(TypeRelationName, RowExclusiveLock);
/* Use LookupTypeName here so that shell types can be removed. */
/* Use LookupTypeName here so that shell types can be found (why?). */
domainoid = LookupTypeName(typename);
if (!OidIsValid(domainoid))
elog(ERROR, "Type \"%s\" does not exist",
@ -1055,93 +1057,84 @@ AlterDomainNotNull(List *names, bool notNull)
tup = SearchSysCacheCopy(TYPEOID,
ObjectIdGetDatum(domainoid),
0, 0, 0);
if (!HeapTupleIsValid(tup))
elog(ERROR, "AlterDomain: type \"%s\" does not exist",
TypeNameToString(typename));
/* Doesn't return if user isn't allowed to alter the domain */
domainPermissionCheck(tup, typename);
typTup = (Form_pg_type) GETSTRUCT(tup);
/* Is the domain already set to the destination constraint? */
/* Doesn't return if user isn't allowed to alter the domain */
domainOwnerCheck(tup, typename);
/* Is the domain already set to the desired constraint? */
if (typTup->typnotnull == notNull)
elog(ERROR, "AlterDomain: %s is already set to %s",
{
elog(NOTICE, "AlterDomain: %s is already set to %s",
TypeNameToString(typename),
notNull ? "NOT NULL" : "NULL");
heap_close(typrel, RowExclusiveLock);
return;
}
/* Adding a NOT NULL constraint requires checking current domains */
/* Adding a NOT NULL constraint requires checking existing columns */
if (notNull)
{
List *rels;
List *rt;
/* Fetch relation list with attributes based on this domain */
rels = get_rels_with_domain(domainoid);
/* ShareLock is sufficient to prevent concurrent data changes */
rels = get_rels_with_domain(domainoid, ShareLock);
foreach (rt, rels)
{
Relation typrel;
HeapTuple tuple;
RelToCheck *rtc = (RelToCheck *) lfirst(rt);
Relation testrel = rtc->rel;
TupleDesc tupdesc = RelationGetDescr(testrel);
HeapScanDesc scan;
TupleDesc tupdesc;
relToCheck *rtc = (relToCheck *) lfirst(rt);
HeapTuple tuple;
/* Lock relation */
typrel = heap_open(rtc->relOid, ExclusiveLock);
tupdesc = RelationGetDescr(typrel);
/* Fetch tuples sequentially */
scan = heap_beginscan(typrel, SnapshotNow, 0, NULL);
/* Scan all tuples in this relation */
scan = heap_beginscan(testrel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
int i;
/* Test attributes */
/* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
int attnum = rtc->atts[i];
Datum d;
bool isNull;
d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull);
d = heap_getattr(tuple, attnum, tupdesc, &isNull);
if (isNull)
elog(ERROR, "ALTER DOMAIN: Relation \"%s\" Attribute \"%s\" "
"contains NULL values",
RelationGetRelationName(typrel),
NameStr(*attnumAttName(typrel, rtc->atts[i])));
elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains NULL values",
RelationGetRelationName(testrel),
NameStr(tupdesc->attrs[attnum - 1]->attname));
}
}
heap_endscan(scan);
/* Release lock */
heap_close(typrel, NoLock);
/* Close each rel after processing, but keep lock */
heap_close(testrel, NoLock);
}
}
/*
* Okay to update pg_type row. We can scribble on typTup because it's
* a copy.
*/
typTup->typnotnull = notNull;
/* Setup new tuple */
MemSet(new_record, (Datum) 0, sizeof(new_record));
MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
MemSet(new_record_repl, ' ', sizeof(new_record_repl));
simple_heap_update(typrel, &tup->t_self, tup);
new_record[Anum_pg_type_typnotnull - 1] = BoolGetDatum(notNull);
new_record_repl[Anum_pg_type_typnotnull - 1] = 'r';
/* Build the new tuple */
newtuple = heap_modifytuple(tup, rel,
new_record, new_record_nulls, new_record_repl);
simple_heap_update(rel, &tup->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
CatalogUpdateIndexes(typrel, tup);
/* Clean up */
heap_close(rel, NoLock);
heap_freetuple(newtuple);
heap_freetuple(tup);
heap_close(typrel, RowExclusiveLock);
}
/*
@ -1186,7 +1179,7 @@ AlterDomainDropConstraint(List *names, const char *constrName, DropBehavior beha
TypeNameToString(typename));
/* Doesn't return if user isn't allowed to alter the domain */
domainPermissionCheck(tup, typename);
domainOwnerCheck(tup, typename);
/* Grab an appropriate lock on the pg_constraint relation */
conrel = heap_openr(ConstraintRelationName, RowExclusiveLock);
@ -1236,11 +1229,11 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
{
TypeName *typename;
Oid domainoid;
Relation typrel;
HeapTuple tup;
Relation rel;
Form_pg_type typTup;
List *rels;
List *rt;
Form_pg_type typTup;
EState *estate;
ExprContext *econtext;
char *ccbin;
@ -1256,9 +1249,9 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
typename->arrayBounds = NIL;
/* Lock the type table */
rel = heap_openr(TypeRelationName, RowExclusiveLock);
typrel = heap_openr(TypeRelationName, RowExclusiveLock);
/* Use LookupTypeName here so that shell types can be found. */
/* Use LookupTypeName here so that shell types can be found (why?). */
domainoid = LookupTypeName(typename);
if (!OidIsValid(domainoid))
elog(ERROR, "Type \"%s\" does not exist",
@ -1267,15 +1260,13 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
tup = SearchSysCacheCopy(TYPEOID,
ObjectIdGetDatum(domainoid),
0, 0, 0);
if (!HeapTupleIsValid(tup))
elog(ERROR, "AlterDomain: type \"%s\" does not exist",
TypeNameToString(typename));
typTup = (Form_pg_type) GETSTRUCT(tup);
/* Doesn't return if user isn't allowed to alter the domain */
domainPermissionCheck(tup, typename);
domainOwnerCheck(tup, typename);
/* Check for unsupported constraint types */
if (IsA(newConstraint, FkConstraint))
@ -1346,35 +1337,34 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
/* build execution state for expr */
exprstate = ExecPrepareExpr(expr, estate);
rels = get_rels_with_domain(domainoid);
/* Fetch relation list with attributes based on this domain */
/* ShareLock is sufficient to prevent concurrent data changes */
rels = get_rels_with_domain(domainoid, ShareLock);
foreach (rt, rels)
{
relToCheck *rtc = (relToCheck *) lfirst(rt);
Relation testrel;
TupleDesc tupdesc;
HeapTuple tuple;
RelToCheck *rtc = (RelToCheck *) lfirst(rt);
Relation testrel = rtc->rel;
TupleDesc tupdesc = RelationGetDescr(testrel);
HeapScanDesc scan;
HeapTuple tuple;
/* Lock relation against changes */
testrel = heap_open(rtc->relOid, ShareLock);
tupdesc = RelationGetDescr(testrel);
/* Scan through table */
/* Scan all tuples in this relation */
scan = heap_beginscan(testrel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
int i;
/* Loop through each attribute of the tuple with a domain */
/* Test attributes that are of the domain */
for (i = 0; i < rtc->natts; i++)
{
int attnum = rtc->atts[i];
Datum d;
bool isNull;
Datum conResult;
d = heap_getattr(tuple, rtc->atts[i], tupdesc, &isNull);
d = heap_getattr(tuple, attnum, tupdesc, &isNull);
econtext->domainValue_datum = d;
econtext->domainValue_isNull = isNull;
@ -1384,13 +1374,13 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
&isNull, NULL);
if (!isNull && !DatumGetBool(conResult))
elog(ERROR, "AlterDomainAddConstraint: Domain %s constraint %s failed",
NameStr(typTup->typname), constr->name);
elog(ERROR, "ALTER DOMAIN: Relation \"%s\" attribute \"%s\" contains values that fail the new constraint",
RelationGetRelationName(testrel),
NameStr(tupdesc->attrs[attnum - 1]->attname));
}
ResetExprContext(econtext);
}
heap_endscan(scan);
/* Hold relation lock till commit (XXX bad for concurrency) */
@ -1400,114 +1390,158 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
FreeExecutorState(estate);
/* Clean up */
heap_close(rel, NoLock);
heap_close(typrel, RowExclusiveLock);
}
/*
* get_rels_with_domain
*
* Fetch all relations / attributes which are using the domain
* while maintaining a RowExclusiveLock on the pg_attribute
* entries.
*
* The result is a list of RelToCheck structs, one for each distinct
* relation, each containing one or more attribute numbers that are of
* the domain type. We have opened each rel and acquired the specified lock
* type on it.
*
* XXX this is completely broken because there is no way to lock the domain
* to prevent columns from being added or dropped while our command runs.
* We can partially protect against column drops by locking relations as we
* come across them, but there is still a race condition (the window between
* seeing a pg_depend entry and acquiring lock on the relation it references).
* Also, holding locks on all these relations simultaneously creates a non-
* trivial risk of deadlock. We can minimize but not eliminate the deadlock
* risk by using the weakest suitable lock (ShareLock for most callers).
*
* XXX to support domains over domains, we'd need to make this smarter,
* or make its callers smarter, so that we could find columns of derived
* domains. Arrays of domains would be a problem too.
*
* Generally used for retrieving a list of tests when adding
* new constraints to a domain.
*/
List *
get_rels_with_domain(Oid domainOid)
static List *
get_rels_with_domain(Oid domainOid, LOCKMODE lockmode)
{
Relation classRel;
HeapTuple classTup;
Relation attRel;
HeapScanDesc classScan;
List *rels = NIL;
List *result = NIL;
Relation depRel;
ScanKeyData key[2];
SysScanDesc depScan;
HeapTuple depTup;
/*
* We need to lock the domain rows for the length of the transaction,
* but once all of the tables and the appropriate attributes are
* found we can relese the relation lock.
* We scan pg_depend to find those things that depend on the domain.
* (We assume we can ignore refobjsubid for a domain.)
*/
classRel = relation_openr(RelationRelationName, ExclusiveLock);
attRel = relation_openr(AttributeRelationName, RowExclusiveLock);
depRel = relation_openr(DependRelationName, AccessShareLock);
classScan = heap_beginscan(classRel, SnapshotNow, 0, NULL);
ScanKeyEntryInitialize(&key[0], 0x0,
Anum_pg_depend_refclassid, F_OIDEQ,
ObjectIdGetDatum(RelOid_pg_type));
ScanKeyEntryInitialize(&key[1], 0x0,
Anum_pg_depend_refobjid, F_OIDEQ,
ObjectIdGetDatum(domainOid));
/* Scan through pg_class for tables */
while ((classTup = heap_getnext(classScan, ForwardScanDirection)) != NULL)
depScan = systable_beginscan(depRel, DependReferenceIndex, true,
SnapshotNow, 2, key);
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
relToCheck *rtc = NULL;
int nkeys = 0;
HeapTuple attTup;
HeapScanDesc attScan;
ScanKeyData attKey[2];
Form_pg_class pg_class;
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
RelToCheck *rtc = NULL;
List *rellist;
Form_pg_attribute pg_att;
int ptr;
/* Get our pg_class struct */
pg_class = (Form_pg_class) GETSTRUCT(classTup);
/* Ignore dependees that aren't user columns of tables */
/* (we assume system columns are never of domain types) */
if (pg_depend->classid != RelOid_pg_class ||
pg_depend->objsubid <= 0)
continue;
/* Fetch attributes from pg_attribute for the relation of the type domainOid */
ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_attrelid,
F_OIDEQ, ObjectIdGetDatum(HeapTupleGetOid(classTup)));
ScanKeyEntryInitialize(&attKey[nkeys++], 0, Anum_pg_attribute_atttypid,
F_OIDEQ, ObjectIdGetDatum(domainOid));
/* Setup to scan pg_attribute */
attScan = heap_beginscan(attRel, SnapshotNow, nkeys, attKey);
/* Scan through pg_attribute for attributes based on the domain */
while ((attTup = heap_getnext(attScan, ForwardScanDirection)) != NULL)
/* See if we already have an entry for this relation */
foreach(rellist, result)
{
if (rtc == NULL)
{
/* First one found for this rel */
rtc = (relToCheck *)palloc(sizeof(relToCheck));
rtc->atts = (int *)palloc(sizeof(int) * pg_class->relnatts);
rtc->relOid = HeapTupleGetOid(classTup);
rtc->natts = 0;
rels = lcons((void *)rtc, rels);
}
RelToCheck *rt = (RelToCheck *) lfirst(rellist);
/* Now add the attribute */
rtc->atts[rtc->natts++] = ((Form_pg_attribute) GETSTRUCT(attTup))->attnum;
if (RelationGetRelid(rt->rel) == pg_depend->objid)
{
rtc = rt;
break;
}
}
heap_endscan(attScan);
if (rtc == NULL)
{
/* First attribute found for this relation */
Relation rel;
/* Acquire requested lock on relation */
rel = heap_open(pg_depend->objid, lockmode);
/* Build the RelToCheck entry with enough space for all atts */
rtc = (RelToCheck *) palloc(sizeof(RelToCheck));
rtc->rel = rel;
rtc->natts = 0;
rtc->atts = (int *) palloc(sizeof(int) * RelationGetNumberOfAttributes(rel));
result = lcons(rtc, result);
}
/*
* Confirm column has not been dropped, and is of the expected type.
* This defends against an ALTER DROP COLUMN occuring just before
* we acquired lock ... but if the whole table were dropped, we'd
* still have a problem.
*/
if (pg_depend->objsubid > RelationGetNumberOfAttributes(rtc->rel))
continue;
pg_att = rtc->rel->rd_att->attrs[pg_depend->objsubid - 1];
if (pg_att->attisdropped || pg_att->atttypid != domainOid)
continue;
/*
* Okay, add column to result. We store the columns in column-number
* order; this is just a hack to improve predictability of regression
* test output ...
*/
Assert(rtc->natts < RelationGetNumberOfAttributes(rtc->rel));
ptr = rtc->natts++;
while (ptr > 0 && rtc->atts[ptr-1] > pg_depend->objsubid)
{
rtc->atts[ptr] = rtc->atts[ptr-1];
ptr--;
}
rtc->atts[ptr] = pg_depend->objsubid;
}
heap_endscan(classScan);
systable_endscan(depScan);
/* Release pg_class, hold pg_attribute for further processing */
relation_close(classRel, ExclusiveLock);
relation_close(attRel, NoLock);
relation_close(depRel, AccessShareLock);
return rels;
return result;
}
/*
* domainPermissionCheck
* domainOwnerCheck
*
* Throw an error if the current user doesn't have permission to modify
* the domain in an ALTER DOMAIN statement, or if the type isn't actually
* a domain.
*/
void
domainPermissionCheck(HeapTuple tup, TypeName *typename)
static void
domainOwnerCheck(HeapTuple tup, TypeName *typename)
{
Form_pg_type typTup = (Form_pg_type) GETSTRUCT(tup);
/* Permission check: must own type or its namespace */
if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()) &&
!pg_namespace_ownercheck(typTup->typnamespace,
GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename));
/* Check that this is actually a domain */
if (typTup->typtype != 'd')
elog(ERROR, "%s is not a domain",
TypeNameToString(typename));
}
/* Permission check: must own type */
if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, TypeNameToString(typename));
}
/*
* domainAddConstraint - code shared between CREATE and ALTER DOMAIN

View File

@ -201,19 +201,19 @@ create table domnotnull
);
insert into domnotnull default values;
alter domain dnotnulltest set not null; -- fails
ERROR: ALTER DOMAIN: Relation "domnotnull" Attribute "col1" contains NULL values
ERROR: ALTER DOMAIN: Relation "domnotnull" attribute "col1" contains NULL values
update domnotnull set col1 = 5;
alter domain dnotnulltest set not null; -- fails
ERROR: ALTER DOMAIN: Relation "domnotnull" Attribute "col2" contains NULL values
ERROR: ALTER DOMAIN: Relation "domnotnull" attribute "col2" contains NULL values
update domnotnull set col2 = 6;
alter domain dnotnulltest set not null;
alter domain dnotnulltest set not null; -- fails
ERROR: AlterDomain: dnotnulltest is already set to NOT NULL
NOTICE: AlterDomain: dnotnulltest is already set to NOT NULL
update domnotnull set col1 = null; -- fails
ERROR: Domain dnotnulltest does not allow NULL values
alter domain dnotnulltest drop not null;
alter domain dnotnulltest drop not null; -- fails
ERROR: AlterDomain: dnotnulltest is already set to NULL
NOTICE: AlterDomain: dnotnulltest is already set to NULL
update domnotnull set col1 = null;
drop domain dnotnulltest cascade;
NOTICE: Drop cascades to table domnotnull column col2
@ -253,7 +253,7 @@ create table domcontest (col1 con);
insert into domcontest values (1);
insert into domcontest values (2);
alter domain con add constraint t check (VALUE < 1); -- fails
ERROR: AlterDomainAddConstraint: Domain con constraint t failed
ERROR: ALTER DOMAIN: Relation "domcontest" attribute "col1" contains values that fail the new constraint
alter domain con add constraint t check (VALUE < 34);
alter domain con add check (VALUE > 0);
insert into domcontest values (-5); -- fails