/*------------------------------------------------------------------------- * * comment.c * * PostgreSQL object comments utility code. * * Copyright (c) 1996-2011, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/commands/comment.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" #include "catalog/indexing.h" #include "catalog/objectaddress.h" #include "catalog/pg_description.h" #include "catalog/pg_shdescription.h" #include "commands/comment.h" #include "commands/dbcommands.h" #include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "parser/parse_func.h" #include "parser/parse_type.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/rel.h" #include "utils/tqual.h" /* * For most object types, the permissions-checking logic is simple enough * that it makes sense to just include it in CommentObject(). However, a few * object types require something more complex; for those, we define helper * functions. */ static void CheckAttributeComment(Relation relation); static void CheckCastComment(List *qualname, List *arguments); /* * CommentObject -- * * This routine is used to add the associated comment into * pg_description for the object specified by the given SQL command. */ void CommentObject(CommentStmt *stmt) { ObjectAddress address; Relation relation; /* * When loading a dump, we may see a COMMENT ON DATABASE for the old name * of the database. Erroring out would prevent pg_restore from completing * (which is really pg_restore's fault, but for now we will work around * the problem here). Consensus is that the best fix is to treat wrong * database name as a WARNING not an ERROR; hence, the following special * case. (If the length of stmt->objname is not 1, get_object_address will * throw an error below; that's OK.) */ if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1) { char *database = strVal(linitial(stmt->objname)); if (!OidIsValid(get_database_oid(database, true))) { ereport(WARNING, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", database))); return; } } /* * Translate the parser representation which identifies this object into * an ObjectAddress. get_object_address() will throw an error if the * object does not exist, and will also acquire a lock on the target * to guard against concurrent DROP operations. */ address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs, &relation, ShareUpdateExclusiveLock); /* Privilege and integrity checks. */ switch (stmt->objtype) { case OBJECT_INDEX: case OBJECT_SEQUENCE: case OBJECT_TABLE: case OBJECT_VIEW: case OBJECT_FOREIGN_TABLE: if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(relation)); break; case OBJECT_COLUMN: CheckAttributeComment(relation); break; case OBJECT_DATABASE: if (!pg_database_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, strVal(linitial(stmt->objname))); break; case OBJECT_TYPE: case OBJECT_DOMAIN: if (!pg_type_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE, format_type_be(address.objectId)); break; case OBJECT_AGGREGATE: case OBJECT_FUNCTION: if (!pg_proc_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PROC, NameListToString(stmt->objname)); break; case OBJECT_OPERATOR: if (!pg_oper_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPER, NameListToString(stmt->objname)); break; case OBJECT_RULE: case OBJECT_TRIGGER: case OBJECT_CONSTRAINT: if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(relation)); break; case OBJECT_SCHEMA: if (!pg_namespace_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_NAMESPACE, strVal(linitial(stmt->objname))); break; case OBJECT_COLLATION: if (!pg_collation_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_COLLATION, NameListToString(stmt->objname)); break; case OBJECT_CONVERSION: if (!pg_conversion_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CONVERSION, NameListToString(stmt->objname)); break; case OBJECT_LANGUAGE: if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to comment on procedural language"))); break; case OBJECT_EXTENSION: if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to comment on extension"))); break; case OBJECT_OPCLASS: if (!pg_opclass_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPCLASS, NameListToString(stmt->objname)); break; case OBJECT_OPFAMILY: if (!pg_opfamily_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPFAMILY, NameListToString(stmt->objname)); break; case OBJECT_LARGEOBJECT: if (!lo_compat_privileges && !pg_largeobject_ownercheck(address.objectId, GetUserId())) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be owner of large object %u", address.objectId))); break; case OBJECT_CAST: CheckCastComment(stmt->objname, stmt->objargs); break; case OBJECT_TABLESPACE: if (!pg_tablespace_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE, strVal(linitial(stmt->objname))); break; case OBJECT_ROLE: if (!has_privs_of_role(GetUserId(), address.objectId)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be member of role \"%s\" to comment upon it", strVal(linitial(stmt->objname))))); break; case OBJECT_TSPARSER: if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to comment on text search parser"))); break; case OBJECT_TSDICTIONARY: if (!pg_ts_dict_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSDICTIONARY, NameListToString(stmt->objname)); break; case OBJECT_TSTEMPLATE: if (!superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to comment on text search template"))); break; case OBJECT_TSCONFIGURATION: if (!pg_ts_config_ownercheck(address.objectId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TSCONFIGURATION, NameListToString(stmt->objname)); break; default: elog(ERROR, "unrecognized object type: %d", (int) stmt->objtype); } /* * Databases, tablespaces, and roles are cluster-wide objects, so any * comments on those objects are recorded in the shared pg_shdescription * catalog. Comments on all other objects are recorded in pg_description. */ if (stmt->objtype == OBJECT_DATABASE || stmt->objtype == OBJECT_TABLESPACE || stmt->objtype == OBJECT_ROLE) CreateSharedComments(address.objectId, address.classId, stmt->comment); else CreateComments(address.objectId, address.classId, address.objectSubId, stmt->comment); /* * If get_object_address() opened the relation for us, we close it to keep * the reference count correct - but we retain any locks acquired by * get_object_address() until commit time, to guard against concurrent * activity. */ if (relation != NULL) relation_close(relation, NoLock); } /* * CreateComments -- * * Create a comment for the specified object descriptor. Inserts a new * pg_description tuple, or replaces an existing one with the same key. * * If the comment given is null or an empty string, instead delete any * existing comment for the specified key. */ void CreateComments(Oid oid, Oid classoid, int32 subid, char *comment) { Relation description; ScanKeyData skey[3]; SysScanDesc sd; HeapTuple oldtuple; HeapTuple newtuple = NULL; Datum values[Natts_pg_description]; bool nulls[Natts_pg_description]; bool replaces[Natts_pg_description]; int i; /* Reduce empty-string to NULL case */ if (comment != NULL && strlen(comment) == 0) comment = NULL; /* Prepare to form or update a tuple, if necessary */ if (comment != NULL) { for (i = 0; i < Natts_pg_description; i++) { nulls[i] = false; replaces[i] = true; } i = 0; values[i++] = ObjectIdGetDatum(oid); values[i++] = ObjectIdGetDatum(classoid); values[i++] = Int32GetDatum(subid); values[i++] = CStringGetTextDatum(comment); } /* Use the index to search for a matching old tuple */ ScanKeyInit(&skey[0], Anum_pg_description_objoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(oid)); ScanKeyInit(&skey[1], Anum_pg_description_classoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classoid)); ScanKeyInit(&skey[2], Anum_pg_description_objsubid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(subid)); description = heap_open(DescriptionRelationId, RowExclusiveLock); sd = systable_beginscan(description, DescriptionObjIndexId, true, SnapshotNow, 3, skey); while ((oldtuple = systable_getnext(sd)) != NULL) { /* Found the old tuple, so delete or update it */ if (comment == NULL) simple_heap_delete(description, &oldtuple->t_self); else { newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(description), values, nulls, replaces); simple_heap_update(description, &oldtuple->t_self, newtuple); } break; /* Assume there can be only one match */ } systable_endscan(sd); /* If we didn't find an old tuple, insert a new one */ if (newtuple == NULL && comment != NULL) { newtuple = heap_form_tuple(RelationGetDescr(description), values, nulls); simple_heap_insert(description, newtuple); } /* Update indexes, if necessary */ if (newtuple != NULL) { CatalogUpdateIndexes(description, newtuple); heap_freetuple(newtuple); } /* Done */ heap_close(description, NoLock); } /* * CreateSharedComments -- * * Create a comment for the specified shared object descriptor. Inserts a * new pg_shdescription tuple, or replaces an existing one with the same key. * * If the comment given is null or an empty string, instead delete any * existing comment for the specified key. */ void CreateSharedComments(Oid oid, Oid classoid, char *comment) { Relation shdescription; ScanKeyData skey[2]; SysScanDesc sd; HeapTuple oldtuple; HeapTuple newtuple = NULL; Datum values[Natts_pg_shdescription]; bool nulls[Natts_pg_shdescription]; bool replaces[Natts_pg_shdescription]; int i; /* Reduce empty-string to NULL case */ if (comment != NULL && strlen(comment) == 0) comment = NULL; /* Prepare to form or update a tuple, if necessary */ if (comment != NULL) { for (i = 0; i < Natts_pg_shdescription; i++) { nulls[i] = false; replaces[i] = true; } i = 0; values[i++] = ObjectIdGetDatum(oid); values[i++] = ObjectIdGetDatum(classoid); values[i++] = CStringGetTextDatum(comment); } /* Use the index to search for a matching old tuple */ ScanKeyInit(&skey[0], Anum_pg_shdescription_objoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(oid)); ScanKeyInit(&skey[1], Anum_pg_shdescription_classoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classoid)); shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock); sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true, SnapshotNow, 2, skey); while ((oldtuple = systable_getnext(sd)) != NULL) { /* Found the old tuple, so delete or update it */ if (comment == NULL) simple_heap_delete(shdescription, &oldtuple->t_self); else { newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(shdescription), values, nulls, replaces); simple_heap_update(shdescription, &oldtuple->t_self, newtuple); } break; /* Assume there can be only one match */ } systable_endscan(sd); /* If we didn't find an old tuple, insert a new one */ if (newtuple == NULL && comment != NULL) { newtuple = heap_form_tuple(RelationGetDescr(shdescription), values, nulls); simple_heap_insert(shdescription, newtuple); } /* Update indexes, if necessary */ if (newtuple != NULL) { CatalogUpdateIndexes(shdescription, newtuple); heap_freetuple(newtuple); } /* Done */ heap_close(shdescription, NoLock); } /* * DeleteComments -- remove comments for an object * * If subid is nonzero then only comments matching it will be removed. * If subid is zero, all comments matching the oid/classoid will be removed * (this corresponds to deleting a whole object). */ void DeleteComments(Oid oid, Oid classoid, int32 subid) { Relation description; ScanKeyData skey[3]; int nkeys; SysScanDesc sd; HeapTuple oldtuple; /* Use the index to search for all matching old tuples */ ScanKeyInit(&skey[0], Anum_pg_description_objoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(oid)); ScanKeyInit(&skey[1], Anum_pg_description_classoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classoid)); if (subid != 0) { ScanKeyInit(&skey[2], Anum_pg_description_objsubid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(subid)); nkeys = 3; } else nkeys = 2; description = heap_open(DescriptionRelationId, RowExclusiveLock); sd = systable_beginscan(description, DescriptionObjIndexId, true, SnapshotNow, nkeys, skey); while ((oldtuple = systable_getnext(sd)) != NULL) simple_heap_delete(description, &oldtuple->t_self); /* Done */ systable_endscan(sd); heap_close(description, RowExclusiveLock); } /* * DeleteSharedComments -- remove comments for a shared object */ void DeleteSharedComments(Oid oid, Oid classoid) { Relation shdescription; ScanKeyData skey[2]; SysScanDesc sd; HeapTuple oldtuple; /* Use the index to search for all matching old tuples */ ScanKeyInit(&skey[0], Anum_pg_shdescription_objoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(oid)); ScanKeyInit(&skey[1], Anum_pg_shdescription_classoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classoid)); shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock); sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true, SnapshotNow, 2, skey); while ((oldtuple = systable_getnext(sd)) != NULL) simple_heap_delete(shdescription, &oldtuple->t_self); /* Done */ systable_endscan(sd); heap_close(shdescription, RowExclusiveLock); } /* * GetComment -- get the comment for an object, or null if not found. */ char * GetComment(Oid oid, Oid classoid, int32 subid) { Relation description; ScanKeyData skey[3]; SysScanDesc sd; TupleDesc tupdesc; HeapTuple tuple; char *comment; /* Use the index to search for a matching old tuple */ ScanKeyInit(&skey[0], Anum_pg_description_objoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(oid)); ScanKeyInit(&skey[1], Anum_pg_description_classoid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(classoid)); ScanKeyInit(&skey[2], Anum_pg_description_objsubid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(subid)); description = heap_open(DescriptionRelationId, AccessShareLock); tupdesc = RelationGetDescr(description); sd = systable_beginscan(description, DescriptionObjIndexId, true, SnapshotNow, 3, skey); comment = NULL; while ((tuple = systable_getnext(sd)) != NULL) { Datum value; bool isnull; /* Found the tuple, get description field */ value = heap_getattr(tuple, Anum_pg_description_description, tupdesc, &isnull); if (!isnull) comment = TextDatumGetCString(value); break; /* Assume there can be only one match */ } systable_endscan(sd); /* Done */ heap_close(description, AccessShareLock); return comment; } /* * Check whether the user is allowed to comment on an attribute of the * specified relation. */ static void CheckAttributeComment(Relation relation) { if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(relation)); /* * Allow comments only on columns of tables, views, composite types, and * foreign tables (which are the only relkinds for which pg_dump will dump * per-column comments). In particular we wish to disallow comments on * index columns, because the naming of an index's columns may change * across PG versions, so dumping per-column comments could create reload * failures. */ if (relation->rd_rel->relkind != RELKIND_RELATION && relation->rd_rel->relkind != RELKIND_VIEW && relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE && relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table, view, composite type, or foreign table", RelationGetRelationName(relation)))); } /* * Check whether the user is allowed to comment on the specified cast. */ static void CheckCastComment(List *qualname, List *arguments) { TypeName *sourcetype; TypeName *targettype; Oid sourcetypeid; Oid targettypeid; Assert(list_length(qualname) == 1); sourcetype = (TypeName *) linitial(qualname); Assert(IsA(sourcetype, TypeName)); Assert(list_length(arguments) == 1); targettype = (TypeName *) linitial(arguments); Assert(IsA(targettype, TypeName)); sourcetypeid = typenameTypeId(NULL, sourcetype); targettypeid = typenameTypeId(NULL, targettype); /* Permission check */ if (!pg_type_ownercheck(sourcetypeid, GetUserId()) && !pg_type_ownercheck(targettypeid, GetUserId())) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be owner of type %s or type %s", format_type_be(sourcetypeid), format_type_be(targettypeid)))); }