postgresql/src/backend/commands/comment.c

625 lines
18 KiB
C

/*-------------------------------------------------------------------------
*
* comment.c
*
* PostgreSQL object comments utility code.
*
* Copyright (c) 1996-2011, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/commands/comment.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/indexing.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_description.h"
#include "catalog/pg_shdescription.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
#include "parser/parse_type.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/tqual.h"
/*
* For most object types, the permissions-checking logic is simple enough
* that it makes sense to just include it in CommentObject(). However, a few
* object types require something more complex; for those, we define helper
* functions.
*/
static void CheckAttributeComment(Relation relation);
static void CheckCastComment(List *qualname, List *arguments);
/*
* CommentObject --
*
* This routine is used to add the associated comment into
* pg_description for the object specified by the given SQL command.
*/
void
CommentObject(CommentStmt *stmt)
{
ObjectAddress address;
Relation relation;
/*
* When loading a dump, we may see a COMMENT ON DATABASE for the old name
* of the database. Erroring out would prevent pg_restore from completing
* (which is really pg_restore's fault, but for now we will work around
* the problem here). Consensus is that the best fix is to treat wrong
* database name as a WARNING not an ERROR; hence, the following special
* case. (If the length of stmt->objname is not 1, get_object_address will
* throw an error below; that's OK.)
*/
if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1)
{
char *database = strVal(linitial(stmt->objname));
if (!OidIsValid(get_database_oid(database, true)))
{
ereport(WARNING,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", database)));
return;
}
}
/*
* Translate the parser representation which identifies this object into
* an ObjectAddress. get_object_address() will throw an error if the
* object does not exist, and will also acquire a lock on the target
* to guard against concurrent DROP operations.
*/
address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs,
&relation, ShareUpdateExclusiveLock);
/* Privilege and integrity checks. */
switch (stmt->objtype)
{
case OBJECT_INDEX:
case OBJECT_SEQUENCE:
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
break;
case OBJECT_COLUMN:
CheckAttributeComment(relation);
break;
case OBJECT_DATABASE:
if (!pg_database_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_TYPE:
if (!pg_type_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE,
format_type_be(address.objectId));
break;
case OBJECT_AGGREGATE:
case OBJECT_FUNCTION:
if (!pg_proc_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC,
NameListToString(stmt->objname));
break;
case OBJECT_OPERATOR:
if (!pg_oper_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPER,
NameListToString(stmt->objname));
break;
case OBJECT_RULE:
case OBJECT_TRIGGER:
case OBJECT_CONSTRAINT:
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
break;
case OBJECT_SCHEMA:
if (!pg_namespace_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_CONVERSION:
if (!pg_conversion_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CONVERSION,
NameListToString(stmt->objname));
break;
case OBJECT_LANGUAGE:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on procedural language")));
break;
case OBJECT_OPCLASS:
if (!pg_opclass_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPCLASS,
NameListToString(stmt->objname));
break;
case OBJECT_OPFAMILY:
if (!pg_opfamily_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPFAMILY,
NameListToString(stmt->objname));
break;
case OBJECT_LARGEOBJECT:
if (!lo_compat_privileges &&
!pg_largeobject_ownercheck(address.objectId, GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of large object %u",
address.objectId)));
break;
case OBJECT_CAST:
CheckCastComment(stmt->objname, stmt->objargs);
break;
case OBJECT_TABLESPACE:
if (!pg_tablespace_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
strVal(linitial(stmt->objname)));
break;
case OBJECT_ROLE:
if (!has_privs_of_role(GetUserId(), address.objectId))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be member of role \"%s\" to comment upon it",
strVal(linitial(stmt->objname)))));
break;
case OBJECT_TSPARSER:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on text search parser")));
break;
case OBJECT_TSDICTIONARY:
if (!pg_ts_dict_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSDICTIONARY,
NameListToString(stmt->objname));
break;
case OBJECT_TSTEMPLATE:
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to comment on text search template")));
break;
case OBJECT_TSCONFIGURATION:
if (!pg_ts_config_ownercheck(address.objectId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSCONFIGURATION,
NameListToString(stmt->objname));
break;
default:
elog(ERROR, "unrecognized object type: %d",
(int) stmt->objtype);
}
/*
* Databases, tablespaces, and roles are cluster-wide objects, so any
* comments on those objects are recorded in the shared pg_shdescription
* catalog. Comments on all other objects are recorded in pg_description.
*/
if (stmt->objtype == OBJECT_DATABASE || stmt->objtype == OBJECT_TABLESPACE
|| stmt->objtype == OBJECT_ROLE)
CreateSharedComments(address.objectId, address.classId, stmt->comment);
else
CreateComments(address.objectId, address.classId, address.objectSubId,
stmt->comment);
/*
* If get_object_address() opened the relation for us, we close it to keep
* the reference count correct - but we retain any locks acquired by
* get_object_address() until commit time, to guard against concurrent
* activity.
*/
if (relation != NULL)
relation_close(relation, NoLock);
}
/*
* CreateComments --
*
* Create a comment for the specified object descriptor. Inserts a new
* pg_description tuple, or replaces an existing one with the same key.
*
* If the comment given is null or an empty string, instead delete any
* existing comment for the specified key.
*/
void
CreateComments(Oid oid, Oid classoid, int32 subid, char *comment)
{
Relation description;
ScanKeyData skey[3];
SysScanDesc sd;
HeapTuple oldtuple;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_description];
bool nulls[Natts_pg_description];
bool replaces[Natts_pg_description];
int i;
/* Reduce empty-string to NULL case */
if (comment != NULL && strlen(comment) == 0)
comment = NULL;
/* Prepare to form or update a tuple, if necessary */
if (comment != NULL)
{
for (i = 0; i < Natts_pg_description; i++)
{
nulls[i] = false;
replaces[i] = true;
}
i = 0;
values[i++] = ObjectIdGetDatum(oid);
values[i++] = ObjectIdGetDatum(classoid);
values[i++] = Int32GetDatum(subid);
values[i++] = CStringGetTextDatum(comment);
}
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
description = heap_open(DescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, 3, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
{
/* Found the old tuple, so delete or update it */
if (comment == NULL)
simple_heap_delete(description, &oldtuple->t_self);
else
{
newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(description), values,
nulls, replaces);
simple_heap_update(description, &oldtuple->t_self, newtuple);
}
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* If we didn't find an old tuple, insert a new one */
if (newtuple == NULL && comment != NULL)
{
newtuple = heap_form_tuple(RelationGetDescr(description),
values, nulls);
simple_heap_insert(description, newtuple);
}
/* Update indexes, if necessary */
if (newtuple != NULL)
{
CatalogUpdateIndexes(description, newtuple);
heap_freetuple(newtuple);
}
/* Done */
heap_close(description, NoLock);
}
/*
* CreateSharedComments --
*
* Create a comment for the specified shared object descriptor. Inserts a
* new pg_shdescription tuple, or replaces an existing one with the same key.
*
* If the comment given is null or an empty string, instead delete any
* existing comment for the specified key.
*/
void
CreateSharedComments(Oid oid, Oid classoid, char *comment)
{
Relation shdescription;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple oldtuple;
HeapTuple newtuple = NULL;
Datum values[Natts_pg_shdescription];
bool nulls[Natts_pg_shdescription];
bool replaces[Natts_pg_shdescription];
int i;
/* Reduce empty-string to NULL case */
if (comment != NULL && strlen(comment) == 0)
comment = NULL;
/* Prepare to form or update a tuple, if necessary */
if (comment != NULL)
{
for (i = 0; i < Natts_pg_shdescription; i++)
{
nulls[i] = false;
replaces[i] = true;
}
i = 0;
values[i++] = ObjectIdGetDatum(oid);
values[i++] = ObjectIdGetDatum(classoid);
values[i++] = CStringGetTextDatum(comment);
}
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_shdescription_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_shdescription_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
SnapshotNow, 2, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
{
/* Found the old tuple, so delete or update it */
if (comment == NULL)
simple_heap_delete(shdescription, &oldtuple->t_self);
else
{
newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(shdescription),
values, nulls, replaces);
simple_heap_update(shdescription, &oldtuple->t_self, newtuple);
}
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* If we didn't find an old tuple, insert a new one */
if (newtuple == NULL && comment != NULL)
{
newtuple = heap_form_tuple(RelationGetDescr(shdescription),
values, nulls);
simple_heap_insert(shdescription, newtuple);
}
/* Update indexes, if necessary */
if (newtuple != NULL)
{
CatalogUpdateIndexes(shdescription, newtuple);
heap_freetuple(newtuple);
}
/* Done */
heap_close(shdescription, NoLock);
}
/*
* DeleteComments -- remove comments for an object
*
* If subid is nonzero then only comments matching it will be removed.
* If subid is zero, all comments matching the oid/classoid will be removed
* (this corresponds to deleting a whole object).
*/
void
DeleteComments(Oid oid, Oid classoid, int32 subid)
{
Relation description;
ScanKeyData skey[3];
int nkeys;
SysScanDesc sd;
HeapTuple oldtuple;
/* Use the index to search for all matching old tuples */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
if (subid != 0)
{
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
nkeys = 3;
}
else
nkeys = 2;
description = heap_open(DescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, nkeys, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
simple_heap_delete(description, &oldtuple->t_self);
/* Done */
systable_endscan(sd);
heap_close(description, RowExclusiveLock);
}
/*
* DeleteSharedComments -- remove comments for a shared object
*/
void
DeleteSharedComments(Oid oid, Oid classoid)
{
Relation shdescription;
ScanKeyData skey[2];
SysScanDesc sd;
HeapTuple oldtuple;
/* Use the index to search for all matching old tuples */
ScanKeyInit(&skey[0],
Anum_pg_shdescription_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_shdescription_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
SnapshotNow, 2, skey);
while ((oldtuple = systable_getnext(sd)) != NULL)
simple_heap_delete(shdescription, &oldtuple->t_self);
/* Done */
systable_endscan(sd);
heap_close(shdescription, RowExclusiveLock);
}
/*
* GetComment -- get the comment for an object, or null if not found.
*/
char *
GetComment(Oid oid, Oid classoid, int32 subid)
{
Relation description;
ScanKeyData skey[3];
SysScanDesc sd;
TupleDesc tupdesc;
HeapTuple tuple;
char *comment;
/* Use the index to search for a matching old tuple */
ScanKeyInit(&skey[0],
Anum_pg_description_objoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(oid));
ScanKeyInit(&skey[1],
Anum_pg_description_classoid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classoid));
ScanKeyInit(&skey[2],
Anum_pg_description_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(subid));
description = heap_open(DescriptionRelationId, AccessShareLock);
tupdesc = RelationGetDescr(description);
sd = systable_beginscan(description, DescriptionObjIndexId, true,
SnapshotNow, 3, skey);
comment = NULL;
while ((tuple = systable_getnext(sd)) != NULL)
{
Datum value;
bool isnull;
/* Found the tuple, get description field */
value = heap_getattr(tuple, Anum_pg_description_description, tupdesc, &isnull);
if (!isnull)
comment = TextDatumGetCString(value);
break; /* Assume there can be only one match */
}
systable_endscan(sd);
/* Done */
heap_close(description, AccessShareLock);
return comment;
}
/*
* Check whether the user is allowed to comment on an attribute of the
* specified relation.
*/
static void
CheckAttributeComment(Relation relation)
{
if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));
/*
* Allow comments only on columns of tables, views, composite types, and
* foreign tables (which are the only relkinds for which pg_dump will dump
* per-column comments). In particular we wish to disallow comments on
* index columns, because the naming of an index's columns may change
* across PG versions, so dumping per-column comments could create reload
* failures.
*/
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_VIEW &&
relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, composite type, or foreign table",
RelationGetRelationName(relation))));
}
/*
* Check whether the user is allowed to comment on the specified cast.
*/
static void
CheckCastComment(List *qualname, List *arguments)
{
TypeName *sourcetype;
TypeName *targettype;
Oid sourcetypeid;
Oid targettypeid;
Assert(list_length(qualname) == 1);
sourcetype = (TypeName *) linitial(qualname);
Assert(IsA(sourcetype, TypeName));
Assert(list_length(arguments) == 1);
targettype = (TypeName *) linitial(arguments);
Assert(IsA(targettype, TypeName));
sourcetypeid = typenameTypeId(NULL, sourcetype);
targettypeid = typenameTypeId(NULL, targettype);
/* Permission check */
if (!pg_type_ownercheck(sourcetypeid, GetUserId())
&& !pg_type_ownercheck(targettypeid, GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be owner of type %s or type %s",
format_type_be(sourcetypeid),
format_type_be(targettypeid))));
}