Now that we've rearranged relation open to get a lock before touching

the rel, it's easy to get rid of the narrow race-condition window that
used to exist in VACUUM and CLUSTER.  Did some minor code-beautification
work in the same area, too.
This commit is contained in:
Tom Lane 2006-08-18 16:09:13 +00:00
parent e91600d1c2
commit 7aa772f03e
8 changed files with 153 additions and 91 deletions

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.218 2006/07/31 20:08:59 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.219 2006/08/18 16:09:08 tgl Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
@ -53,6 +53,7 @@
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/relcache.h" #include "utils/relcache.h"
#include "utils/syscache.h"
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@ -702,14 +703,56 @@ relation_open(Oid relationId, LOCKMODE lockmode)
} }
/* ---------------- /* ----------------
* conditional_relation_open - open with option not to wait * try_relation_open - open any relation by relation OID
* *
* As above, but if nowait is true, then throw an error rather than * Same as relation_open, except return NULL instead of failing
* waiting when the lock is not immediately obtainable. * if the relation does not exist.
* ---------------- * ----------------
*/ */
Relation Relation
conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait) try_relation_open(Oid relationId, LOCKMODE lockmode)
{
Relation r;
Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
/* Get the lock first */
if (lockmode != NoLock)
LockRelationOid(relationId, lockmode);
/*
* Now that we have the lock, probe to see if the relation really
* exists or not.
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(relationId),
0, 0, 0))
{
/* Release useless lock */
if (lockmode != NoLock)
UnlockRelationOid(relationId, lockmode);
return NULL;
}
/* Should be safe to do a relcache load */
r = RelationIdGetRelation(relationId);
if (!RelationIsValid(r))
elog(ERROR, "could not open relation with OID %u", relationId);
return r;
}
/* ----------------
* relation_open_nowait - open but don't wait for lock
*
* Same as relation_open, except throw an error instead of waiting
* when the requested lock is not immediately obtainable.
* ----------------
*/
Relation
relation_open_nowait(Oid relationId, LOCKMODE lockmode)
{ {
Relation r; Relation r;
@ -718,27 +761,22 @@ conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
/* Get the lock before trying to open the relcache entry */ /* Get the lock before trying to open the relcache entry */
if (lockmode != NoLock) if (lockmode != NoLock)
{ {
if (nowait) if (!ConditionalLockRelationOid(relationId, lockmode))
{ {
if (!ConditionalLockRelationOid(relationId, lockmode)) /* try to throw error by name; relation could be deleted... */
{ char *relname = get_rel_name(relationId);
/* try to throw error by name; relation could be deleted... */
char *relname = get_rel_name(relationId);
if (relname) if (relname)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE), (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"", errmsg("could not obtain lock on relation \"%s\"",
relname))); relname)));
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE), (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation with OID %u", errmsg("could not obtain lock on relation with OID %u",
relationId))); relationId)));
}
} }
else
LockRelationOid(relationId, lockmode);
} }
/* The relcache does all the real work... */ /* The relcache does all the real work... */
@ -753,7 +791,7 @@ conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait)
/* ---------------- /* ----------------
* relation_openrv - open any relation specified by a RangeVar * relation_openrv - open any relation specified by a RangeVar
* *
* As above, but the relation is specified by a RangeVar. * Same as relation_open, but the relation is specified by a RangeVar.
* ---------------- * ----------------
*/ */
Relation Relation

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.96 2006/07/14 14:52:18 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.97 2006/08/18 16:09:08 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -129,20 +129,16 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* /*
* Race condition -- if the pg_class tuple has gone away since the last * Open the relation, getting only a read lock on it. If the rel has
* time we saw it, we don't need to process it. * been dropped since we last saw it, we don't need to process it.
*/ */
if (!SearchSysCacheExists(RELOID, onerel = try_relation_open(relid, AccessShareLock);
ObjectIdGetDatum(relid), if (!onerel)
0, 0, 0))
return; return;
/* /*
* Open the class, getting only a read lock on it, and check permissions. * Check permissions --- this should match vacuum's check!
* Permissions check should match vacuum's check!
*/ */
onerel = relation_open(relid, AccessShareLock);
if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) || if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
(pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared))) (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
{ {

View File

@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.152 2006/07/31 20:09:00 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.153 2006/08/18 16:09:08 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -239,6 +239,18 @@ cluster_rel(RelToCluster *rvtc, bool recheck)
/* Check for user-requested abort. */ /* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/*
* We grab exclusive access to the target rel and index for the duration
* of the transaction. (This is redundant for the single-transaction
* case, since cluster() already did it.) The index lock is taken inside
* check_index_is_clusterable.
*/
OldHeap = try_relation_open(rvtc->tableOid, AccessExclusiveLock);
/* If the table has gone away, we can skip processing it */
if (!OldHeap)
return;
/* /*
* Since we may open a new transaction for each relation, we have to check * Since we may open a new transaction for each relation, we have to check
* that the relation still is what we think it is. * that the relation still is what we think it is.
@ -252,20 +264,23 @@ cluster_rel(RelToCluster *rvtc, bool recheck)
HeapTuple tuple; HeapTuple tuple;
Form_pg_index indexForm; Form_pg_index indexForm;
/*
* Check if the relation and index still exist before opening them
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->tableOid),
0, 0, 0) ||
!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0))
return;
/* Check that the user still owns the relation */ /* Check that the user still owns the relation */
if (!pg_class_ownercheck(rvtc->tableOid, GetUserId())) if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
{
relation_close(OldHeap, AccessExclusiveLock);
return; return;
}
/*
* Check that the index still exists
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0))
{
relation_close(OldHeap, AccessExclusiveLock);
return;
}
/* /*
* Check that the index is still the one with indisclustered set. * Check that the index is still the one with indisclustered set.
@ -273,25 +288,21 @@ cluster_rel(RelToCluster *rvtc, bool recheck)
tuple = SearchSysCache(INDEXRELID, tuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(rvtc->indexOid), ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0); 0, 0, 0);
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple)) /* probably can't happen */
return; /* could have gone away... */ {
relation_close(OldHeap, AccessExclusiveLock);
return;
}
indexForm = (Form_pg_index) GETSTRUCT(tuple); indexForm = (Form_pg_index) GETSTRUCT(tuple);
if (!indexForm->indisclustered) if (!indexForm->indisclustered)
{ {
ReleaseSysCache(tuple); ReleaseSysCache(tuple);
relation_close(OldHeap, AccessExclusiveLock);
return; return;
} }
ReleaseSysCache(tuple); ReleaseSysCache(tuple);
} }
/*
* We grab exclusive access to the target rel and index for the duration
* of the transaction. (This is redundant for the single- transaction
* case, since cluster() already did it.) The index lock is taken inside
* check_index_is_clusterable.
*/
OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
/* Check index is valid to cluster on */ /* Check index is valid to cluster on */
check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck); check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/lockcmds.c,v 1.14 2006/03/05 15:58:24 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/lockcmds.c,v 1.15 2006/08/18 16:09:08 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -59,7 +59,10 @@ LockTableCommand(LockStmt *lockstmt)
aclcheck_error(aclresult, ACL_KIND_CLASS, aclcheck_error(aclresult, ACL_KIND_CLASS,
get_rel_name(reloid)); get_rel_name(reloid));
rel = conditional_relation_open(reloid, lockstmt->mode, lockstmt->nowait); if (lockstmt->nowait)
rel = relation_open_nowait(reloid, lockstmt->mode);
else
rel = relation_open(reloid, lockstmt->mode);
/* Currently, we only allow plain tables to be locked */ /* Currently, we only allow plain tables to be locked */
if (rel->rd_rel->relkind != RELKIND_RELATION) if (rel->rd_rel->relkind != RELKIND_RELATION)

View File

@ -13,7 +13,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.337 2006/07/31 20:09:00 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.338 2006/08/18 16:09:08 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -1041,31 +1041,12 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
MyProc->inVacuum = true; MyProc->inVacuum = true;
} }
/*
* Tell the cache replacement strategy that vacuum is causing all
* following IO
*/
StrategyHintVacuum(true);
/* /*
* Check for user-requested abort. Note we want this to be inside a * Check for user-requested abort. Note we want this to be inside a
* transaction, so xact.c doesn't issue useless WARNING. * transaction, so xact.c doesn't issue useless WARNING.
*/ */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/*
* Race condition -- if the pg_class tuple has gone away since the last
* time we saw it, we don't need to vacuum it.
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0))
{
StrategyHintVacuum(false);
CommitTransactionCommand();
return;
}
/* /*
* Determine the type of lock we want --- hard exclusive lock for a FULL * Determine the type of lock we want --- hard exclusive lock for a FULL
* vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
@ -1074,7 +1055,21 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock; lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
/* /*
* Open the class, get an appropriate lock on it, and check permissions. * Open the relation and get the appropriate lock on it.
*
* There's a race condition here: the rel may have gone away since
* the last time we saw it. If so, we don't need to vacuum it.
*/
onerel = try_relation_open(relid, lmode);
if (!onerel)
{
CommitTransactionCommand();
return;
}
/*
* Check permissions.
* *
* We allow the user to vacuum a table if he is superuser, the table * We allow the user to vacuum a table if he is superuser, the table
* owner, or the database owner (but in the latter case, only if it's not * owner, or the database owner (but in the latter case, only if it's not
@ -1083,8 +1078,6 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
* Note we choose to treat permissions failure as a WARNING and keep * Note we choose to treat permissions failure as a WARNING and keep
* trying to vacuum the rest of the DB --- is this appropriate? * trying to vacuum the rest of the DB --- is this appropriate?
*/ */
onerel = relation_open(relid, lmode);
if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) || if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
(pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared))) (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
{ {
@ -1092,7 +1085,6 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
(errmsg("skipping \"%s\" --- only table or database owner can vacuum it", (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
RelationGetRelationName(onerel)))); RelationGetRelationName(onerel))));
relation_close(onerel, lmode); relation_close(onerel, lmode);
StrategyHintVacuum(false);
CommitTransactionCommand(); CommitTransactionCommand();
return; return;
} }
@ -1107,7 +1099,6 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
(errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables", (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
RelationGetRelationName(onerel)))); RelationGetRelationName(onerel))));
relation_close(onerel, lmode); relation_close(onerel, lmode);
StrategyHintVacuum(false);
CommitTransactionCommand(); CommitTransactionCommand();
return; return;
} }
@ -1122,7 +1113,6 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
if (isOtherTempNamespace(RelationGetNamespace(onerel))) if (isOtherTempNamespace(RelationGetNamespace(onerel)))
{ {
relation_close(onerel, lmode); relation_close(onerel, lmode);
StrategyHintVacuum(false);
CommitTransactionCommand(); CommitTransactionCommand();
return; /* assume no long-lived data in temp tables */ return; /* assume no long-lived data in temp tables */
} }
@ -1145,6 +1135,12 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
*/ */
toast_relid = onerel->rd_rel->reltoastrelid; toast_relid = onerel->rd_rel->reltoastrelid;
/*
* Tell the cache replacement strategy that vacuum is causing all
* following IO
*/
StrategyHintVacuum(true);
/* /*
* Do the actual work --- either FULL or "lazy" vacuum * Do the actual work --- either FULL or "lazy" vacuum
*/ */
@ -1153,13 +1149,14 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
else else
lazy_vacuum_rel(onerel, vacstmt); lazy_vacuum_rel(onerel, vacstmt);
StrategyHintVacuum(false);
/* all done with this class, but hold lock until commit */ /* all done with this class, but hold lock until commit */
relation_close(onerel, NoLock); relation_close(onerel, NoLock);
/* /*
* Complete the transaction and free all temporary memory used. * Complete the transaction and free all temporary memory used.
*/ */
StrategyHintVacuum(false);
CommitTransactionCommand(); CommitTransactionCommand();
/* /*

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.86 2006/07/31 20:09:05 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.87 2006/08/18 16:09:09 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -128,8 +128,8 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
/* /*
* UnlockRelationId * UnlockRelationId
* *
* Note: we don't supply UnlockRelationOid since it's normally easy for * Unlock, given a LockRelId. This is preferred over UnlockRelationOid
* callers to provide the LockRelId info from a relcache entry. * for speed reasons.
*/ */
void void
UnlockRelationId(LockRelId *relid, LOCKMODE lockmode) UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
@ -141,6 +141,21 @@ UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
LockRelease(&tag, lockmode, false); LockRelease(&tag, lockmode, false);
} }
/*
* UnlockRelationOid
*
* Unlock, given only a relation Oid. Use UnlockRelationId if you can.
*/
void
UnlockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
SetLocktagRelationOid(&tag, relid);
LockRelease(&tag, lockmode, false);
}
/* /*
* LockRelation * LockRelation
* *

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.114 2006/07/03 22:45:39 tgl Exp $ * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.115 2006/08/18 16:09:10 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -129,7 +129,8 @@ typedef enum
} LockTupleMode; } LockTupleMode;
extern Relation relation_open(Oid relationId, LOCKMODE lockmode); extern Relation relation_open(Oid relationId, LOCKMODE lockmode);
extern Relation conditional_relation_open(Oid relationId, LOCKMODE lockmode, bool nowait); extern Relation try_relation_open(Oid relationId, LOCKMODE lockmode);
extern Relation relation_open_nowait(Oid relationId, LOCKMODE lockmode);
extern Relation relation_openrv(const RangeVar *relation, LOCKMODE lockmode); extern Relation relation_openrv(const RangeVar *relation, LOCKMODE lockmode);
extern void relation_close(Relation relation, LOCKMODE lockmode); extern void relation_close(Relation relation, LOCKMODE lockmode);

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.55 2006/07/31 20:09:05 tgl Exp $ * $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.56 2006/08/18 16:09:13 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -24,6 +24,7 @@ extern void RelationInitLockInfo(Relation relation);
extern void LockRelationOid(Oid relid, LOCKMODE lockmode); extern void LockRelationOid(Oid relid, LOCKMODE lockmode);
extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode); extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode);
extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode); extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode);
extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
extern void LockRelation(Relation relation, LOCKMODE lockmode); extern void LockRelation(Relation relation, LOCKMODE lockmode);
extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);