diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index ec4c98769a..30873aa9a3 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -59,6 +59,7 @@ #include "parser/parse_type.h" #include "rewrite/rewriteSupport.h" #include "storage/lmgr.h" +#include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -248,7 +249,6 @@ static ObjectAddress get_object_address_type(ObjectType objtype, List *objname, bool missing_ok); static ObjectAddress get_object_address_opcf(ObjectType objtype, List *objname, List *objargs, bool missing_ok); -static bool object_exists(ObjectAddress address); static ObjectPropertyType *get_object_property_data(Oid class_id); /* @@ -276,166 +276,224 @@ get_object_address(ObjectType objtype, List *objname, List *objargs, Relation *relp, LOCKMODE lockmode, bool missing_ok) { ObjectAddress address; + ObjectAddress old_address = {InvalidOid, InvalidOid, 0}; Relation relation = NULL; + uint64 inval_count; /* Some kind of lock must be taken. */ Assert(lockmode != NoLock); - switch (objtype) + for (;;) { - case OBJECT_INDEX: - case OBJECT_SEQUENCE: - case OBJECT_TABLE: - case OBJECT_VIEW: - case OBJECT_FOREIGN_TABLE: - address = - get_relation_by_qualified_name(objtype, objname, - &relation, lockmode, - missing_ok); - break; - case OBJECT_COLUMN: - address = - get_object_address_attribute(objtype, objname, - &relation, lockmode, - missing_ok); - break; - case OBJECT_RULE: - case OBJECT_TRIGGER: - case OBJECT_CONSTRAINT: - address = get_object_address_relobject(objtype, objname, - &relation, missing_ok); - break; - case OBJECT_DATABASE: - case OBJECT_EXTENSION: - case OBJECT_TABLESPACE: - case OBJECT_ROLE: - case OBJECT_SCHEMA: - case OBJECT_LANGUAGE: - case OBJECT_FDW: - case OBJECT_FOREIGN_SERVER: - address = get_object_address_unqualified(objtype, - objname, missing_ok); - break; - case OBJECT_TYPE: - case OBJECT_DOMAIN: - address = get_object_address_type(objtype, objname, missing_ok); - break; - case OBJECT_AGGREGATE: - address.classId = ProcedureRelationId; - address.objectId = - LookupAggNameTypeNames(objname, objargs, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_FUNCTION: - address.classId = ProcedureRelationId; - address.objectId = - LookupFuncNameTypeNames(objname, objargs, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_OPERATOR: - Assert(list_length(objargs) == 2); - address.classId = OperatorRelationId; - address.objectId = - LookupOperNameTypeNames(NULL, objname, - (TypeName *) linitial(objargs), - (TypeName *) lsecond(objargs), - missing_ok, -1); - address.objectSubId = 0; - break; - case OBJECT_COLLATION: - address.classId = CollationRelationId; - address.objectId = get_collation_oid(objname, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_CONVERSION: - address.classId = ConversionRelationId; - address.objectId = get_conversion_oid(objname, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_OPCLASS: - case OBJECT_OPFAMILY: - address = get_object_address_opcf(objtype, - objname, objargs, missing_ok); - break; - case OBJECT_LARGEOBJECT: - Assert(list_length(objname) == 1); - address.classId = LargeObjectRelationId; - address.objectId = oidparse(linitial(objname)); - address.objectSubId = 0; - if (!LargeObjectExists(address.objectId)) - { - if (!missing_ok) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("large object %u does not exist", - address.objectId))); - } - break; - case OBJECT_CAST: - { - TypeName *sourcetype = (TypeName *) linitial(objname); - TypeName *targettype = (TypeName *) linitial(objargs); - Oid sourcetypeid = typenameTypeId(NULL, sourcetype); - Oid targettypeid = typenameTypeId(NULL, targettype); + /* + * Remember this value, so that, after looking up the object name + * and locking it, we can check whether any invalidation messages + * have been processed that might require a do-over. + */ + inval_count = SharedInvalidMessageCounter; - address.classId = CastRelationId; + /* Look up object address. */ + switch (objtype) + { + case OBJECT_INDEX: + case OBJECT_SEQUENCE: + case OBJECT_TABLE: + case OBJECT_VIEW: + case OBJECT_FOREIGN_TABLE: + address = + get_relation_by_qualified_name(objtype, objname, + &relation, lockmode, + missing_ok); + break; + case OBJECT_COLUMN: + address = + get_object_address_attribute(objtype, objname, + &relation, lockmode, + missing_ok); + break; + case OBJECT_RULE: + case OBJECT_TRIGGER: + case OBJECT_CONSTRAINT: + address = get_object_address_relobject(objtype, objname, + &relation, missing_ok); + break; + case OBJECT_DATABASE: + case OBJECT_EXTENSION: + case OBJECT_TABLESPACE: + case OBJECT_ROLE: + case OBJECT_SCHEMA: + case OBJECT_LANGUAGE: + case OBJECT_FDW: + case OBJECT_FOREIGN_SERVER: + address = get_object_address_unqualified(objtype, + objname, missing_ok); + break; + case OBJECT_TYPE: + case OBJECT_DOMAIN: + address = get_object_address_type(objtype, objname, missing_ok); + break; + case OBJECT_AGGREGATE: + address.classId = ProcedureRelationId; address.objectId = - get_cast_oid(sourcetypeid, targettypeid, missing_ok); + LookupAggNameTypeNames(objname, objargs, missing_ok); address.objectSubId = 0; + break; + case OBJECT_FUNCTION: + address.classId = ProcedureRelationId; + address.objectId = + LookupFuncNameTypeNames(objname, objargs, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_OPERATOR: + Assert(list_length(objargs) == 2); + address.classId = OperatorRelationId; + address.objectId = + LookupOperNameTypeNames(NULL, objname, + (TypeName *) linitial(objargs), + (TypeName *) lsecond(objargs), + missing_ok, -1); + address.objectSubId = 0; + break; + case OBJECT_COLLATION: + address.classId = CollationRelationId; + address.objectId = get_collation_oid(objname, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_CONVERSION: + address.classId = ConversionRelationId; + address.objectId = get_conversion_oid(objname, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_OPCLASS: + case OBJECT_OPFAMILY: + address = get_object_address_opcf(objtype, + objname, objargs, missing_ok); + break; + case OBJECT_LARGEOBJECT: + Assert(list_length(objname) == 1); + address.classId = LargeObjectRelationId; + address.objectId = oidparse(linitial(objname)); + address.objectSubId = 0; + if (!LargeObjectExists(address.objectId)) + { + if (!missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", + address.objectId))); + } + break; + case OBJECT_CAST: + { + TypeName *sourcetype = (TypeName *) linitial(objname); + TypeName *targettype = (TypeName *) linitial(objargs); + Oid sourcetypeid = typenameTypeId(NULL, sourcetype); + Oid targettypeid = typenameTypeId(NULL, targettype); + + address.classId = CastRelationId; + address.objectId = + get_cast_oid(sourcetypeid, targettypeid, missing_ok); + address.objectSubId = 0; + } + break; + case OBJECT_TSPARSER: + address.classId = TSParserRelationId; + address.objectId = get_ts_parser_oid(objname, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_TSDICTIONARY: + address.classId = TSDictionaryRelationId; + address.objectId = get_ts_dict_oid(objname, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_TSTEMPLATE: + address.classId = TSTemplateRelationId; + address.objectId = get_ts_template_oid(objname, missing_ok); + address.objectSubId = 0; + break; + case OBJECT_TSCONFIGURATION: + address.classId = TSConfigRelationId; + address.objectId = get_ts_config_oid(objname, missing_ok); + address.objectSubId = 0; + break; + default: + elog(ERROR, "unrecognized objtype: %d", (int) objtype); + /* placate compiler, in case it thinks elog might return */ + address.classId = InvalidOid; + address.objectId = InvalidOid; + address.objectSubId = 0; + } + + /* + * If we could not find the supplied object, return without locking. + */ + if (!OidIsValid(address.objectId)) + { + Assert(missing_ok); + return address; + } + + /* + * If we're retrying, see if we got the same answer as last time. If + * so, we're done; if not, we locked the wrong thing, so give up our + * lock. + */ + if (OidIsValid(old_address.classId)) + { + if (old_address.classId == address.classId + && old_address.objectId == address.objectId + && old_address.objectSubId == address.objectSubId) + break; + if (old_address.classId != RelationRelationId) + { + if (IsSharedRelation(old_address.classId)) + UnlockSharedObject(old_address.classId, + old_address.objectId, + 0, lockmode); + else + UnlockDatabaseObject(old_address.classId, + old_address.objectId, + 0, lockmode); } - break; - case OBJECT_TSPARSER: - address.classId = TSParserRelationId; - address.objectId = get_ts_parser_oid(objname, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_TSDICTIONARY: - address.classId = TSDictionaryRelationId; - address.objectId = get_ts_dict_oid(objname, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_TSTEMPLATE: - address.classId = TSTemplateRelationId; - address.objectId = get_ts_template_oid(objname, missing_ok); - address.objectSubId = 0; - break; - case OBJECT_TSCONFIGURATION: - address.classId = TSConfigRelationId; - address.objectId = get_ts_config_oid(objname, missing_ok); - address.objectSubId = 0; - break; - default: - elog(ERROR, "unrecognized objtype: %d", (int) objtype); - /* placate compiler, in case it thinks elog might return */ - address.classId = InvalidOid; - address.objectId = InvalidOid; - address.objectSubId = 0; - } + } - /* - * If we could not find the supplied object, return without locking. - */ - if (!OidIsValid(address.objectId)) - { - Assert(missing_ok); - return address; - } + /* + * If we're dealing with a relation or attribute, then the relation is + * already locked. Otherwise, we lock it now. + */ + if (address.classId != RelationRelationId) + { + if (IsSharedRelation(address.classId)) + LockSharedObject(address.classId, address.objectId, 0, + lockmode); + else + LockDatabaseObject(address.classId, address.objectId, 0, + lockmode); + } - /* - * If we're dealing with a relation or attribute, then the relation is - * already locked. If we're dealing with any other type of object, we - * need to lock it and then verify that it still exists. - */ - if (address.classId != RelationRelationId) - { - if (IsSharedRelation(address.classId)) - LockSharedObject(address.classId, address.objectId, 0, lockmode); - else - LockDatabaseObject(address.classId, address.objectId, 0, lockmode); - /* Did it go away while we were waiting for the lock? */ - if (!object_exists(address)) - elog(ERROR, "cache lookup failed for class %u object %u subobj %d", - address.classId, address.objectId, address.objectSubId); + /* + * At this point, we've resolved the name to an OID and locked the + * corresponding database object. However, it's possible that by the + * time we acquire the lock on the object, concurrent DDL has modified + * the database in such a way that the name we originally looked up + * no longer resolves to that OID. + * + * We can be certain that this isn't an issue if (a) no shared + * invalidation messages have been processed or (b) we've locked a + * relation somewhere along the line. All the relation name lookups + * in this module ultimately use RangeVarGetRelid() to acquire a + * relation lock, and that function protects against the same kinds of + * races we're worried about here. Even when operating on a + * constraint, rule, or trigger, we still acquire AccessShareLock on + * the relation, which is enough to freeze out any concurrent DDL. + * + * In all other cases, however, it's possible that the name we looked + * up no longer refers to the object we locked, so we retry the + * lookup and see whether we get the same answer. + */ + if (inval_count == SharedInvalidMessageCounter || relation != NULL) + break; + old_address = address; } /* Return the object address and the relation. */ @@ -843,73 +901,6 @@ get_object_address_opcf(ObjectType objtype, return address; } -/* - * Test whether an object exists. - */ -static bool -object_exists(ObjectAddress address) -{ - int cache = -1; - Oid indexoid = InvalidOid; - Relation rel; - ScanKeyData skey[1]; - SysScanDesc sd; - bool found; - ObjectPropertyType *property; - - /* Sub-objects require special treatment. */ - if (address.objectSubId != 0) - { - HeapTuple atttup; - - /* Currently, attributes are the only sub-objects. */ - Assert(address.classId == RelationRelationId); - atttup = SearchSysCache2(ATTNUM, ObjectIdGetDatum(address.objectId), - Int16GetDatum(address.objectSubId)); - if (!HeapTupleIsValid(atttup)) - found = false; - else - { - found = ((Form_pg_attribute) GETSTRUCT(atttup))->attisdropped; - ReleaseSysCache(atttup); - } - return found; - } - - /* - * Weird backward compatibility hack: ObjectAddress notation uses - * LargeObjectRelationId for large objects, but since PostgreSQL - * 9.0, the relevant catalog is actually LargeObjectMetadataRelationId. - */ - if (address.classId == LargeObjectRelationId) - address.classId = LargeObjectMetadataRelationId; - - /* - * For object types that have a relevant syscache, we use it; for - * everything else, we'll have to do an index-scan. - */ - property = get_object_property_data(address.classId); - cache = property->oid_catcache_id; - indexoid = property->oid_index_oid; - - /* Found a syscache? */ - if (cache != -1) - return SearchSysCacheExists1(cache, ObjectIdGetDatum(address.objectId)); - - /* No syscache, so examine the table directly. */ - Assert(OidIsValid(indexoid)); - ScanKeyInit(&skey[0], - ObjectIdAttributeNumber, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(address.objectId)); - rel = heap_open(address.classId, AccessShareLock); - sd = systable_beginscan(rel, indexoid, true, SnapshotNow, 1, skey); - found = HeapTupleIsValid(systable_getnext(sd)); - systable_endscan(sd); - heap_close(rel, AccessShareLock); - return found; -} - /* * Check ownership of an object previously identified by get_object_address. */