Improve CREATE/DROP/RENAME DATABASE so that when failing because the source

or target database is being accessed by other users, it tells you whether
the "other users" are live sessions or uncommitted prepared transactions.
(Indeed, it tells you exactly how many of each, but that's mostly just
because it was easy to do so.)  This should help forestall the gotcha of
not realizing that a prepared transaction is what's blocking the command.
Per discussion.
This commit is contained in:
Tom Lane 2008-08-04 18:03:46 +00:00
parent ec73b56a31
commit 4abd7b49f1
3 changed files with 73 additions and 38 deletions

View File

@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.209 2008/05/12 00:00:47 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.210 2008/08/04 18:03:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -73,6 +73,7 @@ static bool get_db_info(const char *name, LOCKMODE lockmode,
static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id);
static bool check_db_file_conflict(Oid db_id);
static int errdetail_busy_db(int notherbackends, int npreparedxacts);
/*
@ -110,6 +111,8 @@ createdb(const CreatedbStmt *stmt)
int encoding = -1;
int dbconnlimit = -1;
int ctype_encoding;
int notherbackends;
int npreparedxacts;
createdb_failure_params fparms;
/* Extract options from the statement node tree */
@ -385,11 +388,12 @@ createdb(const CreatedbStmt *stmt)
* potential waiting; we may as well throw an error first if we're gonna
* throw one.
*/
if (CheckOtherDBBackends(src_dboid))
if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("source database \"%s\" is being accessed by other users",
dbtemplate)));
dbtemplate),
errdetail_busy_db(notherbackends, npreparedxacts)));
/*
* Select an OID for the new database, checking that it doesn't have a
@ -612,6 +616,8 @@ dropdb(const char *dbname, bool missing_ok)
bool db_istemplate;
Relation pgdbrel;
HeapTuple tup;
int notherbackends;
int npreparedxacts;
/*
* Look up the target database's OID, and get exclusive lock on it. We
@ -671,11 +677,12 @@ dropdb(const char *dbname, bool missing_ok)
*
* As in CREATE DATABASE, check this after other error conditions.
*/
if (CheckOtherDBBackends(db_id))
if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is being accessed by other users",
dbname)));
dbname),
errdetail_busy_db(notherbackends, npreparedxacts)));
/*
* Remove the database's tuple from pg_database.
@ -764,6 +771,8 @@ RenameDatabase(const char *oldname, const char *newname)
Oid db_id;
HeapTuple newtup;
Relation rel;
int notherbackends;
int npreparedxacts;
/*
* Look up the target database's OID, and get exclusive lock on it. We
@ -814,11 +823,12 @@ RenameDatabase(const char *oldname, const char *newname)
*
* As in CREATE DATABASE, check this after other error conditions.
*/
if (CheckOtherDBBackends(db_id))
if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("database \"%s\" is being accessed by other users",
oldname)));
oldname),
errdetail_busy_db(notherbackends, npreparedxacts)));
/* rename */
newtup = SearchSysCacheCopy(DATABASEOID,
@ -1400,6 +1410,29 @@ check_db_file_conflict(Oid db_id)
return result;
}
/*
* Issue a suitable errdetail message for a busy database
*/
static int
errdetail_busy_db(int notherbackends, int npreparedxacts)
{
/*
* We don't worry about singular versus plural here, since the English
* rules for that don't translate very well. But we can at least avoid
* the case of zero items.
*/
if (notherbackends > 0 && npreparedxacts > 0)
errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
notherbackends, npreparedxacts);
else if (notherbackends > 0)
errdetail("There are %d other session(s) using the database.",
notherbackends);
else
errdetail("There are %d prepared transaction(s) using the database.",
npreparedxacts);
return 0; /* just to keep ereport macro happy */
}
/*
* get_database_oid - given a database name, look up the OID
*

View File

@ -23,7 +23,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.45 2008/07/11 02:10:13 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.46 2008/08/04 18:03:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -1177,7 +1177,7 @@ CountUserBackends(Oid roleid)
}
/*
* CheckOtherDBBackends -- check for other backends running in the given DB
* CountOtherDBBackends -- check for other backends running in the given DB
*
* If there are other backends in the DB, we will wait a maximum of 5 seconds
* for them to exit. Autovacuum backends are encouraged to exit early by
@ -1187,6 +1187,8 @@ CountUserBackends(Oid roleid)
* check whether the current backend uses the given DB, if it's important.
*
* Returns TRUE if there are (still) other backends in the DB, FALSE if not.
* Also, *nbackends and *nprepared are set to the number of other backends
* and prepared transactions in the DB, respectively.
*
* This function is used to interlock DROP DATABASE and related commands
* against there being any active backends in the target DB --- dropping the
@ -1198,19 +1200,24 @@ CountUserBackends(Oid roleid)
* indefinitely.
*/
bool
CheckOtherDBBackends(Oid databaseId)
CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
{
ProcArrayStruct *arrayP = procArray;
#define MAXAUTOVACPIDS 10 /* max autovacs to SIGTERM per iteration */
int autovac_pids[MAXAUTOVACPIDS];
int tries;
/* 50 tries with 100ms sleep between tries makes 5 sec total wait */
for (tries = 0; tries < 50; tries++)
{
int nautovacs = 0;
bool found = false;
int index;
CHECK_FOR_INTERRUPTS();
*nbackends = *nprepared = 0;
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
@ -1224,38 +1231,32 @@ CheckOtherDBBackends(Oid databaseId)
found = true;
if (proc->vacuumFlags & PROC_IS_AUTOVACUUM)
{
/* an autovacuum --- send it SIGTERM before sleeping */
int autopid = proc->pid;
/*
* It's a bit awkward to release ProcArrayLock within the
* loop, but we'd probably better do so before issuing kill().
* We have no idea what might block kill() inside the
* kernel...
*/
LWLockRelease(ProcArrayLock);
(void) kill(autopid, SIGTERM); /* ignore any error */
break;
}
if (proc->pid == 0)
(*nprepared)++;
else
{
LWLockRelease(ProcArrayLock);
break;
(*nbackends)++;
if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
nautovacs < MAXAUTOVACPIDS)
autovac_pids[nautovacs++] = proc->pid;
}
}
/* if found is set, we released the lock within the loop body */
if (!found)
{
LWLockRelease(ProcArrayLock);
return false; /* no conflicting backends, so done */
}
LWLockRelease(ProcArrayLock);
/* else sleep and try again */
if (!found)
return false; /* no conflicting backends, so done */
/*
* Send SIGTERM to any conflicting autovacuums before sleeping.
* We postpone this step until after the loop because we don't
* want to hold ProcArrayLock while issuing kill().
* We have no idea what might block kill() inside the kernel...
*/
for (index = 0; index < nautovacs; index++)
(void) kill(autovac_pids[index], SIGTERM); /* ignore any error */
/* sleep, then try again */
pg_usleep(100 * 1000L); /* 100ms */
}

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/procarray.h,v 1.22 2008/05/12 20:02:02 alvherre Exp $
* $PostgreSQL: pgsql/src/include/storage/procarray.h,v 1.23 2008/08/04 18:03:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -44,7 +44,8 @@ extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
extern int CountActiveBackends(void);
extern int CountDBBackends(Oid databaseid);
extern int CountUserBackends(Oid roleid);
extern bool CheckOtherDBBackends(Oid databaseId);
extern bool CountOtherDBBackends(Oid databaseId,
int *nbackends, int *nprepared);
extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids,