postgresql/src/backend/commands/comment.c
Tom Lane d9572c4e3b Core support for "extensions", which are packages of SQL objects.
This patch adds the server infrastructure to support extensions.
There is still one significant loose end, namely how to make it play nice
with pg_upgrade, so I am not yet committing the changes that would make
all the contrib modules depend on this feature.

In passing, fix a disturbingly large amount of breakage in
AlterObjectNamespace() and callers.

Dimitri Fontaine, reviewed by Anssi Kääriäinen,
Itagaki Takahiro, Tom Lane, and numerous others
2011-02-08 16:13:22 -05:00

631 lines
18 KiB
C

/*-------------------------------------------------------------------------
*
* comment.c
*
* PostgreSQL object comments utility code.
*
* Copyright (c) 1996-2011, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/commands/comment.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/indexing.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_description.h"
#include "catalog/pg_shdescription.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/tqual.h"
/*
* For most object types, the permissions-checking logic is simple enough
* that it makes sense to just include it in CommentObject(). However, a few
* object types require something more complex; for those, we define helper
* functions.
*/
static void CheckAttributeComment(Relation relation);
static void CheckCastComment(List *qualname, List *arguments);
/*
* CommentObject --
*
* This routine is used to add the associated comment into
* pg_description for the object specified by the given SQL command.
*/
void
CommentObject(CommentStmt *stmt)
{
ObjectAddress address;
Relation relation;
/*
* When loading a dump, we may see a COMMENT ON DATABASE for the old name
* of the database. Erroring out would prevent pg_restore from completing
* (which is really pg_restore's fault, but for now we will work around
* the problem here). Consensus is that the best fix is to treat wrong
* database name as a WARNING not an ERROR; hence, the following special
* case. (If the length of stmt->objname is not 1, get_object_address will
* throw an error below; that's OK.)
*/
if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1)
{
char *database = strVal(linitial(stmt->objname));
if (!OidIsValid(get_database_oid(database, true)))
{
ereport(WARNING,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", database)));
return;
}
}
/*
* Translate the parser representation which identifies this object into
* an ObjectAddress. get_object_address() will throw an error if the
* object does not exist, and will also acquire a lock on the target
* to guard against concurrent DROP operations.
*/
address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs,
&relation, ShareUpdateExclusiveLock);
/* Privilege and integrity checks. */
switch (stmt->objtype)
{
case OBJECT_INDEX:
case OBJECT_SEQUENCE:
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
break;
case OBJECT_COLUMN:
CheckAttributeComment(relation);
break;
case OBJECT_DATABASE:
if (!pg_database_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_TYPE:
if (!pg_type_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE,
format_type_be(address.objectId));
break;
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
if (!pg_proc_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC,
NameListToString(stmt->objname));
break;
case OBJECT_OPERATOR:
if (!pg_oper_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPER,
NameListToString(stmt->objname));
break;
case OBJECT_RULE:
case OBJECT_TRIGGER:
case OBJECT_CONSTRAINT:
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
break;
case OBJECT_SCHEMA:
if (!pg_namespace_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_CONVERSION:
if (!pg_conversion_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CONVERSION,
NameListToString(stmt->objname));
break;
case OBJECT_LANGUAGE:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on procedural language")));
break;
case OBJECT_EXTENSION:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on extension")));
break;
case OBJECT_OPCLASS:
if (!pg_opclass_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPCLASS,
NameListToString(stmt->objname));
break;
case OBJECT_OPFAMILY:
if (!pg_opfamily_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPFAMILY,
NameListToString(stmt->objname));
break;
case OBJECT_LARGEOBJECT:
if (!lo_compat_privileges &&
!pg_largeobject_ownercheck(address.objectId, GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of large object %u",
address.objectId)));
break;
case OBJECT_CAST:
CheckCastComment(stmt->objname, stmt->objargs);
break;
case OBJECT_TABLESPACE:
if (!pg_tablespace_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_ROLE:
if (!has_privs_of_role(GetUserId(), address.objectId))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be member of role \"%s\" to comment upon it",
strVal(linitial(stmt->objname)))));
break;
case OBJECT_TSPARSER:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on text search parser")));
break;
case OBJECT_TSDICTIONARY:
if (!pg_ts_dict_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSDICTIONARY,
NameListToString(stmt->objname));
break;
case OBJECT_TSTEMPLATE:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on text search template")));
break;
case OBJECT_TSCONFIGURATION:
if (!pg_ts_config_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSCONFIGURATION,
NameListToString(stmt->objname));
break;
default:
elog(ERROR, "unrecognized object type: %d",
(int) stmt->objtype);
}
/*
* Databases, tablespaces, and roles are cluster-wide objects, so any
* comments on those objects are recorded in the shared pg_shdescription
* catalog. Comments on all other objects are recorded in pg_description.
*/
if (stmt->objtype == OBJECT_DATABASE || stmt->objtype == OBJECT_TABLESPACE
|| stmt->objtype == OBJECT_ROLE)
CreateSharedComments(address.objectId, address.classId, stmt->comment);
else
CreateComments(address.objectId, address.classId, address.objectSubId,
stmt->comment);
/*
* If get_object_address() opened the relation for us, we close it to keep
* the reference count correct - but we retain any locks acquired by
* get_object_address() until commit time, to guard against concurrent
* activity.
*/
if (relation != NULL)
relation_close(relation, NoLock);
}
/*
* CreateComments --
*
* Create a comment for the specified object descriptor. Inserts a new
* pg_description tuple, or replaces an existing one with the same key.
*
* If the comment given is null or an empty string, instead delete any
* existing comment for the specified key.
*/
void
CreateComments(Oid oid, Oid classoid, int32 subid, char *comment)
{
Relation description;
ScanKeyData skey[3];
SysScanDesc sd;
HeapTuple oldtuple;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_description];
bool nulls[Natts_pg_description];
bool replaces[Natts_pg_description];
int i;
/* Reduce empty-string to NULL case */
if (comment != NULL && strlen(comment) == 0)
comment = NULL;
/* Prepare to form or update a tuple, if necessary */
if (comment != NULL)
{
for (i = 0; i < Natts_pg_description; i++)
{
nulls[i] = false;
replaces[i] = true;
}
i = 0;
values[i++] = ObjectIdGetDatum(oid);
values[i++] = ObjectIdGetDatum(classoid);
values[i++] = Int32GetDatum(subid);
values[i++] = CStringGetTextDatum(comment);
}
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
description = heap_open(DescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, 3, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
{
/* Found the old tuple, so delete or update it */
if (comment == NULL)
simple_heap_delete(description, &oldtuple->t_self);
else
{
newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(description), values,
nulls, replaces);
simple_heap_update(description, &oldtuple->t_self, newtuple);
}
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* If we didn't find an old tuple, insert a new one */
if (newtuple == NULL && comment != NULL)
{
newtuple = heap_form_tuple(RelationGetDescr(description),
values, nulls);
simple_heap_insert(description, newtuple);
}
/* Update indexes, if necessary */
if (newtuple != NULL)
{
CatalogUpdateIndexes(description, newtuple);
heap_freetuple(newtuple);
}
/* Done */
heap_close(description, NoLock);
}
/*
* CreateSharedComments --
*
* Create a comment for the specified shared object descriptor. Inserts a
* new pg_shdescription tuple, or replaces an existing one with the same key.
*
* If the comment given is null or an empty string, instead delete any
* existing comment for the specified key.
*/
void
CreateSharedComments(Oid oid, Oid classoid, char *comment)
{
Relation shdescription;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple oldtuple;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_shdescription];
bool nulls[Natts_pg_shdescription];
bool replaces[Natts_pg_shdescription];
int i;
/* Reduce empty-string to NULL case */
if (comment != NULL && strlen(comment) == 0)
comment = NULL;
/* Prepare to form or update a tuple, if necessary */
if (comment != NULL)
{
for (i = 0; i < Natts_pg_shdescription; i++)
{
nulls[i] = false;
replaces[i] = true;
}
i = 0;
values[i++] = ObjectIdGetDatum(oid);
values[i++] = ObjectIdGetDatum(classoid);
values[i++] = CStringGetTextDatum(comment);
}
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_shdescription_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_shdescription_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
SnapshotNow, 2, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
{
/* Found the old tuple, so delete or update it */
if (comment == NULL)
simple_heap_delete(shdescription, &oldtuple->t_self);
else
{
newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(shdescription),
values, nulls, replaces);
simple_heap_update(shdescription, &oldtuple->t_self, newtuple);
}
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* If we didn't find an old tuple, insert a new one */
if (newtuple == NULL && comment != NULL)
{
newtuple = heap_form_tuple(RelationGetDescr(shdescription),
values, nulls);
simple_heap_insert(shdescription, newtuple);
}
/* Update indexes, if necessary */
if (newtuple != NULL)
{
CatalogUpdateIndexes(shdescription, newtuple);
heap_freetuple(newtuple);
}
/* Done */
heap_close(shdescription, NoLock);
}
/*
* DeleteComments -- remove comments for an object
*
* If subid is nonzero then only comments matching it will be removed.
* If subid is zero, all comments matching the oid/classoid will be removed
* (this corresponds to deleting a whole object).
*/
void
DeleteComments(Oid oid, Oid classoid, int32 subid)
{
Relation description;
ScanKeyData skey[3];
int nkeys;
SysScanDesc sd;
HeapTuple oldtuple;
/* Use the index to search for all matching old tuples */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
if (subid != 0)
{
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
nkeys = 3;
}
else
nkeys = 2;
description = heap_open(DescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, nkeys, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
simple_heap_delete(description, &oldtuple->t_self);
/* Done */
systable_endscan(sd);
heap_close(description, RowExclusiveLock);
}
/*
* DeleteSharedComments -- remove comments for a shared object
*/
void
DeleteSharedComments(Oid oid, Oid classoid)
{
Relation shdescription;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple oldtuple;
/* Use the index to search for all matching old tuples */
ScanKeyInit(&skey[0],
Anum_pg_shdescription_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_shdescription_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
SnapshotNow, 2, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
simple_heap_delete(shdescription, &oldtuple->t_self);
/* Done */
systable_endscan(sd);
heap_close(shdescription, RowExclusiveLock);
}
/*
* GetComment -- get the comment for an object, or null if not found.
*/
char *
GetComment(Oid oid, Oid classoid, int32 subid)
{
Relation description;
ScanKeyData skey[3];
SysScanDesc sd;
TupleDesc tupdesc;
HeapTuple tuple;
char *comment;
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
description = heap_open(DescriptionRelationId, AccessShareLock);
tupdesc = RelationGetDescr(description);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, 3, skey);
comment = NULL;
while ((tuple = systable_getnext(sd)) != NULL)
{
Datum value;
bool isnull;
/* Found the tuple, get description field */
value = heap_getattr(tuple, Anum_pg_description_description, tupdesc, &isnull);
if (!isnull)
comment = TextDatumGetCString(value);
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* Done */
heap_close(description, AccessShareLock);
return comment;
}
/*
* Check whether the user is allowed to comment on an attribute of the
* specified relation.
*/
static void
CheckAttributeComment(Relation relation)
{
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
/*
* Allow comments only on columns of tables, views, composite types, and
* foreign tables (which are the only relkinds for which pg_dump will dump
* per-column comments). In particular we wish to disallow comments on
* index columns, because the naming of an index's columns may change
* across PG versions, so dumping per-column comments could create reload
* failures.
*/
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_VIEW &&
relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, composite type, or foreign table",
RelationGetRelationName(relation))));
}
/*
* Check whether the user is allowed to comment on the specified cast.
*/
static void
CheckCastComment(List *qualname, List *arguments)
{
TypeName *sourcetype;
TypeName *targettype;
Oid sourcetypeid;
Oid targettypeid;
Assert(list_length(qualname) == 1);
sourcetype = (TypeName *) linitial(qualname);
Assert(IsA(sourcetype, TypeName));
Assert(list_length(arguments) == 1);
targettype = (TypeName *) linitial(arguments);
Assert(IsA(targettype, TypeName));
sourcetypeid = typenameTypeId(NULL, sourcetype);
targettypeid = typenameTypeId(NULL, targettype);
/* Permission check */
if (!pg_type_ownercheck(sourcetypeid, GetUserId())
&& !pg_type_ownercheck(targettypeid, GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of type %s or type %s",
format_type_be(sourcetypeid),
format_type_be(targettypeid))));
}