Add assertions that we hold some relevant lock during relation open.

Opening a relation with no lock at all is unsafe; there's no guarantee
that we'll see a consistent state of the relevant catalog entries.
While use of MVCC scans to read the catalogs partially addresses that
complaint, it's still possible to switch to a new catalog snapshot
partway through loading the relcache entry.  Moreover, whether or not
you trust the reasoning behind sometimes using less than
AccessExclusiveLock for ALTER TABLE, that reasoning is certainly not
valid if concurrent users of the table don't hold a lock corresponding
to the operation they want to perform.

Hence, add some assertion-build-only checks that require any caller
of relation_open(x, NoLock) to hold at least AccessShareLock.  This
isn't a full solution, since we can't verify that the lock level is
semantically appropriate for the action --- but it's definitely of
some use, because it's already caught two bugs.

We can also assert that callers of addRangeTableEntryForRelation()
hold at least the lock level specified for the new RTE.

Amit Langote and Tom Lane

Discussion: https://postgr.es/m/16565.1538327894@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2018-10-01 12:43:21 -04:00
parent b66827ca7c
commit b04aeb0a05
7 changed files with 95 additions and 3 deletions

View File

@ -1137,6 +1137,14 @@ relation_open(Oid relationId, LOCKMODE lockmode)
if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId);
/*
* If we didn't get the lock ourselves, assert that caller holds one,
* except in bootstrap mode where no locks are used.
*/
Assert(lockmode != NoLock ||
IsBootstrapProcessingMode() ||
CheckRelationLockedByMe(r, AccessShareLock, true));
/* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r))
MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@ -1183,6 +1191,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId);
/* If we didn't get the lock ourselves, assert that caller holds one */
Assert(lockmode != NoLock ||
CheckRelationLockedByMe(r, AccessShareLock, true));
/* Make note that we've accessed a temporary relation */
if (RelationUsesLocalBuffers(r))
MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@ -8084,6 +8096,8 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
idx_rel = RelationIdGetRelation(replidindex);
Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
/* deform tuple, so we have fast access to columns */
heap_deform_tuple(tp, desc, values, nulls);

View File

@ -28,6 +28,7 @@
#include "parser/parse_enr.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate,
*
* lockmode is the lock type required for query execution; it must be one
* of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the
* RTE's role within the query. The caller should always hold that lock mode
* RTE's role within the query. The caller must hold that lock mode
* or a stronger one.
*
* Note: properly, lockmode should be declared LOCKMODE not int, but that
@ -1295,6 +1296,7 @@ addRangeTableEntryForRelation(ParseState *pstate,
Assert(lockmode == AccessShareLock ||
lockmode == RowShareLock ||
lockmode == RowExclusiveLock);
Assert(CheckRelationLockedByMe(rel, lockmode, true));
rte->rtekind = RTE_RELATION;
rte->alias = alias;

View File

@ -287,6 +287,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
LockRelease(&tag, lockmode, false);
}
/*
* CheckRelationLockedByMe
*
* Returns true if current transaction holds a lock on 'relation' of mode
* 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
* ("Stronger" is defined as "numerically higher", which is a bit
* semantically dubious but is OK for the purposes we use this for.)
*/
bool
CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
if (LockHeldByMe(&tag, lockmode))
return true;
if (orstronger)
{
LOCKMODE slockmode;
for (slockmode = lockmode + 1;
slockmode <= MaxLockMode;
slockmode++)
{
if (LockHeldByMe(&tag, slockmode))
{
#ifdef NOT_USED
/* Sometimes this might be useful for debugging purposes */
elog(WARNING, "lock mode %s substituted for %s on relation %s",
GetLockmodeName(tag.locktag_lockmethodid, slockmode),
GetLockmodeName(tag.locktag_lockmethodid, lockmode),
RelationGetRelationName(relation));
#endif
return true;
}
}
}
return false;
}
/*
* LockHasWaitersRelation
*

View File

@ -563,6 +563,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
return false;
}
/*
* LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
* by the current transaction
*/
bool
LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
/*
* See if there is a LOCALLOCK entry for this lock and lockmode
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
(void *) &localtag,
HASH_FIND, NULL);
return (locallock && locallock->nLocks > 0);
}
/*
* LockHasWaiters -- look up 'locktag' and check if releasing this
* lock would wake up other processes waiting for it.

View File

@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
extern void LockRelation(Relation relation, LOCKMODE lockmode);
extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode,
bool orstronger);
extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);

View File

@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseSession(LOCKMETHODID lockmethodid);
extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
extern bool LockHasWaiters(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,

View File

@ -45,11 +45,15 @@ typedef int LOCKMODE;
#define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM FULL,
* and unqualified LOCK TABLE */
#define MaxLockMode 8
/* WAL representation of an AccessExclusiveLock on a table */
typedef struct xl_standby_lock
{
TransactionId xid; /* xid of holder of AccessExclusiveLock */
Oid dbOid;
Oid relOid;
Oid dbOid; /* DB containing table */
Oid relOid; /* OID of table */
} xl_standby_lock;
#endif /* LOCKDEF_H_ */