Restructure get_object_address() so it's safe against concurrent DDL.

This gives a much better error message when the object of interest is
concurrently dropped and avoids needlessly failing when the object of
interest is concurrently dropped and recreated.  It also improves the
behavior of two concurrent DROP IF EXISTS operations targeted at the
same object; as before, one will drop the object, but now the other
will emit the usual NOTICE indicating that the object does not exist,
instead of rolling back.  As a fringe benefit, it's also slightly
less code.
This commit is contained in:
Robert Haas 2011-11-17 12:41:37 -05:00
parent 309411a69e
commit b3ad5d02c9
1 changed files with 207 additions and 216 deletions

View File

@ -59,6 +59,7 @@
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "rewrite/rewriteSupport.h" #include "rewrite/rewriteSupport.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/sinval.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -248,7 +249,6 @@ static ObjectAddress get_object_address_type(ObjectType objtype,
List *objname, bool missing_ok); List *objname, bool missing_ok);
static ObjectAddress get_object_address_opcf(ObjectType objtype, List *objname, static ObjectAddress get_object_address_opcf(ObjectType objtype, List *objname,
List *objargs, bool missing_ok); List *objargs, bool missing_ok);
static bool object_exists(ObjectAddress address);
static ObjectPropertyType *get_object_property_data(Oid class_id); static ObjectPropertyType *get_object_property_data(Oid class_id);
/* /*
@ -276,166 +276,224 @@ get_object_address(ObjectType objtype, List *objname, List *objargs,
Relation *relp, LOCKMODE lockmode, bool missing_ok) Relation *relp, LOCKMODE lockmode, bool missing_ok)
{ {
ObjectAddress address; ObjectAddress address;
ObjectAddress old_address = {InvalidOid, InvalidOid, 0};
Relation relation = NULL; Relation relation = NULL;
uint64 inval_count;
/* Some kind of lock must be taken. */ /* Some kind of lock must be taken. */
Assert(lockmode != NoLock); Assert(lockmode != NoLock);
switch (objtype) for (;;)
{ {
case OBJECT_INDEX: /*
case OBJECT_SEQUENCE: * Remember this value, so that, after looking up the object name
case OBJECT_TABLE: * and locking it, we can check whether any invalidation messages
case OBJECT_VIEW: * have been processed that might require a do-over.
case OBJECT_FOREIGN_TABLE: */
address = inval_count = SharedInvalidMessageCounter;
get_relation_by_qualified_name(objtype, objname,
&relation, lockmode,
missing_ok);
break;
case OBJECT_COLUMN:
address =
get_object_address_attribute(objtype, objname,
&relation, lockmode,
missing_ok);
break;
case OBJECT_RULE:
case OBJECT_TRIGGER:
case OBJECT_CONSTRAINT:
address = get_object_address_relobject(objtype, objname,
&relation, missing_ok);
break;
case OBJECT_DATABASE:
case OBJECT_EXTENSION:
case OBJECT_TABLESPACE:
case OBJECT_ROLE:
case OBJECT_SCHEMA:
case OBJECT_LANGUAGE:
case OBJECT_FDW:
case OBJECT_FOREIGN_SERVER:
address = get_object_address_unqualified(objtype,
objname, missing_ok);
break;
case OBJECT_TYPE:
case OBJECT_DOMAIN:
address = get_object_address_type(objtype, objname, missing_ok);
break;
case OBJECT_AGGREGATE:
address.classId = ProcedureRelationId;
address.objectId =
LookupAggNameTypeNames(objname, objargs, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_FUNCTION:
address.classId = ProcedureRelationId;
address.objectId =
LookupFuncNameTypeNames(objname, objargs, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_OPERATOR:
Assert(list_length(objargs) == 2);
address.classId = OperatorRelationId;
address.objectId =
LookupOperNameTypeNames(NULL, objname,
(TypeName *) linitial(objargs),
(TypeName *) lsecond(objargs),
missing_ok, -1);
address.objectSubId = 0;
break;
case OBJECT_COLLATION:
address.classId = CollationRelationId;
address.objectId = get_collation_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_CONVERSION:
address.classId = ConversionRelationId;
address.objectId = get_conversion_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_OPCLASS:
case OBJECT_OPFAMILY:
address = get_object_address_opcf(objtype,
objname, objargs, missing_ok);
break;
case OBJECT_LARGEOBJECT:
Assert(list_length(objname) == 1);
address.classId = LargeObjectRelationId;
address.objectId = oidparse(linitial(objname));
address.objectSubId = 0;
if (!LargeObjectExists(address.objectId))
{
if (!missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist",
address.objectId)));
}
break;
case OBJECT_CAST:
{
TypeName *sourcetype = (TypeName *) linitial(objname);
TypeName *targettype = (TypeName *) linitial(objargs);
Oid sourcetypeid = typenameTypeId(NULL, sourcetype);
Oid targettypeid = typenameTypeId(NULL, targettype);
address.classId = CastRelationId; /* Look up object address. */
switch (objtype)
{
case OBJECT_INDEX:
case OBJECT_SEQUENCE:
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
address =
get_relation_by_qualified_name(objtype, objname,
&relation, lockmode,
missing_ok);
break;
case OBJECT_COLUMN:
address =
get_object_address_attribute(objtype, objname,
&relation, lockmode,
missing_ok);
break;
case OBJECT_RULE:
case OBJECT_TRIGGER:
case OBJECT_CONSTRAINT:
address = get_object_address_relobject(objtype, objname,
&relation, missing_ok);
break;
case OBJECT_DATABASE:
case OBJECT_EXTENSION:
case OBJECT_TABLESPACE:
case OBJECT_ROLE:
case OBJECT_SCHEMA:
case OBJECT_LANGUAGE:
case OBJECT_FDW:
case OBJECT_FOREIGN_SERVER:
address = get_object_address_unqualified(objtype,
objname, missing_ok);
break;
case OBJECT_TYPE:
case OBJECT_DOMAIN:
address = get_object_address_type(objtype, objname, missing_ok);
break;
case OBJECT_AGGREGATE:
address.classId = ProcedureRelationId;
address.objectId = address.objectId =
get_cast_oid(sourcetypeid, targettypeid, missing_ok); LookupAggNameTypeNames(objname, objargs, missing_ok);
address.objectSubId = 0; address.objectSubId = 0;
break;
case OBJECT_FUNCTION:
address.classId = ProcedureRelationId;
address.objectId =
LookupFuncNameTypeNames(objname, objargs, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_OPERATOR:
Assert(list_length(objargs) == 2);
address.classId = OperatorRelationId;
address.objectId =
LookupOperNameTypeNames(NULL, objname,
(TypeName *) linitial(objargs),
(TypeName *) lsecond(objargs),
missing_ok, -1);
address.objectSubId = 0;
break;
case OBJECT_COLLATION:
address.classId = CollationRelationId;
address.objectId = get_collation_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_CONVERSION:
address.classId = ConversionRelationId;
address.objectId = get_conversion_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_OPCLASS:
case OBJECT_OPFAMILY:
address = get_object_address_opcf(objtype,
objname, objargs, missing_ok);
break;
case OBJECT_LARGEOBJECT:
Assert(list_length(objname) == 1);
address.classId = LargeObjectRelationId;
address.objectId = oidparse(linitial(objname));
address.objectSubId = 0;
if (!LargeObjectExists(address.objectId))
{
if (!missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist",
address.objectId)));
}
break;
case OBJECT_CAST:
{
TypeName *sourcetype = (TypeName *) linitial(objname);
TypeName *targettype = (TypeName *) linitial(objargs);
Oid sourcetypeid = typenameTypeId(NULL, sourcetype);
Oid targettypeid = typenameTypeId(NULL, targettype);
address.classId = CastRelationId;
address.objectId =
get_cast_oid(sourcetypeid, targettypeid, missing_ok);
address.objectSubId = 0;
}
break;
case OBJECT_TSPARSER:
address.classId = TSParserRelationId;
address.objectId = get_ts_parser_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSDICTIONARY:
address.classId = TSDictionaryRelationId;
address.objectId = get_ts_dict_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSTEMPLATE:
address.classId = TSTemplateRelationId;
address.objectId = get_ts_template_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSCONFIGURATION:
address.classId = TSConfigRelationId;
address.objectId = get_ts_config_oid(objname, missing_ok);
address.objectSubId = 0;
break;
default:
elog(ERROR, "unrecognized objtype: %d", (int) objtype);
/* placate compiler, in case it thinks elog might return */
address.classId = InvalidOid;
address.objectId = InvalidOid;
address.objectSubId = 0;
}
/*
* If we could not find the supplied object, return without locking.
*/
if (!OidIsValid(address.objectId))
{
Assert(missing_ok);
return address;
}
/*
* If we're retrying, see if we got the same answer as last time. If
* so, we're done; if not, we locked the wrong thing, so give up our
* lock.
*/
if (OidIsValid(old_address.classId))
{
if (old_address.classId == address.classId
&& old_address.objectId == address.objectId
&& old_address.objectSubId == address.objectSubId)
break;
if (old_address.classId != RelationRelationId)
{
if (IsSharedRelation(old_address.classId))
UnlockSharedObject(old_address.classId,
old_address.objectId,
0, lockmode);
else
UnlockDatabaseObject(old_address.classId,
old_address.objectId,
0, lockmode);
} }
break; }
case OBJECT_TSPARSER:
address.classId = TSParserRelationId;
address.objectId = get_ts_parser_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSDICTIONARY:
address.classId = TSDictionaryRelationId;
address.objectId = get_ts_dict_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSTEMPLATE:
address.classId = TSTemplateRelationId;
address.objectId = get_ts_template_oid(objname, missing_ok);
address.objectSubId = 0;
break;
case OBJECT_TSCONFIGURATION:
address.classId = TSConfigRelationId;
address.objectId = get_ts_config_oid(objname, missing_ok);
address.objectSubId = 0;
break;
default:
elog(ERROR, "unrecognized objtype: %d", (int) objtype);
/* placate compiler, in case it thinks elog might return */
address.classId = InvalidOid;
address.objectId = InvalidOid;
address.objectSubId = 0;
}
/* /*
* If we could not find the supplied object, return without locking. * If we're dealing with a relation or attribute, then the relation is
*/ * already locked. Otherwise, we lock it now.
if (!OidIsValid(address.objectId)) */
{ if (address.classId != RelationRelationId)
Assert(missing_ok); {
return address; if (IsSharedRelation(address.classId))
} LockSharedObject(address.classId, address.objectId, 0,
lockmode);
else
LockDatabaseObject(address.classId, address.objectId, 0,
lockmode);
}
/* /*
* If we're dealing with a relation or attribute, then the relation is * At this point, we've resolved the name to an OID and locked the
* already locked. If we're dealing with any other type of object, we * corresponding database object. However, it's possible that by the
* need to lock it and then verify that it still exists. * time we acquire the lock on the object, concurrent DDL has modified
*/ * the database in such a way that the name we originally looked up
if (address.classId != RelationRelationId) * no longer resolves to that OID.
{ *
if (IsSharedRelation(address.classId)) * We can be certain that this isn't an issue if (a) no shared
LockSharedObject(address.classId, address.objectId, 0, lockmode); * invalidation messages have been processed or (b) we've locked a
else * relation somewhere along the line. All the relation name lookups
LockDatabaseObject(address.classId, address.objectId, 0, lockmode); * in this module ultimately use RangeVarGetRelid() to acquire a
/* Did it go away while we were waiting for the lock? */ * relation lock, and that function protects against the same kinds of
if (!object_exists(address)) * races we're worried about here. Even when operating on a
elog(ERROR, "cache lookup failed for class %u object %u subobj %d", * constraint, rule, or trigger, we still acquire AccessShareLock on
address.classId, address.objectId, address.objectSubId); * the relation, which is enough to freeze out any concurrent DDL.
*
* In all other cases, however, it's possible that the name we looked
* up no longer refers to the object we locked, so we retry the
* lookup and see whether we get the same answer.
*/
if (inval_count == SharedInvalidMessageCounter || relation != NULL)
break;
old_address = address;
} }
/* Return the object address and the relation. */ /* Return the object address and the relation. */
@ -843,73 +901,6 @@ get_object_address_opcf(ObjectType objtype,
return address; return address;
} }
/*
* Test whether an object exists.
*/
static bool
object_exists(ObjectAddress address)
{
int cache = -1;
Oid indexoid = InvalidOid;
Relation rel;
ScanKeyData skey[1];
SysScanDesc sd;
bool found;
ObjectPropertyType *property;
/* Sub-objects require special treatment. */
if (address.objectSubId != 0)
{
HeapTuple atttup;
/* Currently, attributes are the only sub-objects. */
Assert(address.classId == RelationRelationId);
atttup = SearchSysCache2(ATTNUM, ObjectIdGetDatum(address.objectId),
Int16GetDatum(address.objectSubId));
if (!HeapTupleIsValid(atttup))
found = false;
else
{
found = ((Form_pg_attribute) GETSTRUCT(atttup))->attisdropped;
ReleaseSysCache(atttup);
}
return found;
}
/*
* Weird backward compatibility hack: ObjectAddress notation uses
* LargeObjectRelationId for large objects, but since PostgreSQL
* 9.0, the relevant catalog is actually LargeObjectMetadataRelationId.
*/
if (address.classId == LargeObjectRelationId)
address.classId = LargeObjectMetadataRelationId;
/*
* For object types that have a relevant syscache, we use it; for
* everything else, we'll have to do an index-scan.
*/
property = get_object_property_data(address.classId);
cache = property->oid_catcache_id;
indexoid = property->oid_index_oid;
/* Found a syscache? */
if (cache != -1)
return SearchSysCacheExists1(cache, ObjectIdGetDatum(address.objectId));
/* No syscache, so examine the table directly. */
Assert(OidIsValid(indexoid));
ScanKeyInit(&skey[0],
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(address.objectId));
rel = heap_open(address.classId, AccessShareLock);
sd = systable_beginscan(rel, indexoid, true, SnapshotNow, 1, skey);
found = HeapTupleIsValid(systable_getnext(sd));
systable_endscan(sd);
heap_close(rel, AccessShareLock);
return found;
}
/* /*
* Check ownership of an object previously identified by get_object_address. * Check ownership of an object previously identified by get_object_address.
*/ */