postgresql/src/backend/commands/dbcommands.c

1382 lines
38 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* dbcommands.c
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
* Database management commands (create/drop database).
*
* Note: database creation/destruction commands take ExclusiveLock on
* pg_database to ensure that no two proceed in parallel. We must use
* at least this level of locking to ensure that no two backends try to
* write the flat-file copy of pg_database at once. We avoid using
* AccessExclusiveLock since there's no need to lock out ordinary readers
* of pg_database.
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.155 2005/03/23 00:03:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
2003-06-27 16:45:32 +02:00
#include "access/genam.h"
#include "access/heapam.h"
#include "catalog/catname.h"
#include "catalog/catalog.h"
#include "catalog/pg_database.h"
#include "catalog/pg_shadow.h"
#include "catalog/pg_tablespace.h"
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
#include "catalog/indexing.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
#include "commands/tablespace.h"
#include "mb/pg_wchar.h"
1999-07-16 05:14:30 +02:00
#include "miscadmin.h"
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
#include "storage/freespace.h"
#include "storage/sinval.h"
2003-06-27 16:45:32 +02:00
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/flatfiles.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
/* non-export function prototypes */
static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP,
TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
Oid *dbTablespace);
static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id);
/*
* CREATE DATABASE
*/
void
createdb(const CreatedbStmt *stmt)
{
HeapScanDesc scan;
Relation rel;
Oid src_dboid;
AclId src_owner;
int src_encoding;
bool src_istemplate;
bool src_allowconn;
Oid src_lastsysoid;
TransactionId src_vacuumxid;
TransactionId src_frozenxid;
Oid src_deftablespace;
Oid dst_deftablespace;
Relation pg_database_rel;
HeapTuple tuple;
TupleDesc pg_database_dsc;
Datum new_record[Natts_pg_database];
char new_record_nulls[Natts_pg_database];
Oid dboid;
AclId datdba;
ListCell *option;
2004-08-29 07:07:03 +02:00
DefElem *dtablespacename = NULL;
DefElem *downer = NULL;
2002-09-04 22:31:48 +02:00
DefElem *dtemplate = NULL;
DefElem *dencoding = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
char *dbtemplate = NULL;
2002-09-04 22:31:48 +02:00
int encoding = -1;
2004-08-29 07:07:03 +02:00
#ifndef WIN32
char buf[2 * MAXPGPATH + 100];
#endif
/* don't call this in a transaction block */
PreventTransactionChain((void *) stmt, "CREATE DATABASE");
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "tablespace") == 0)
{
if (dtablespacename)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dtablespacename = defel;
}
else if (strcmp(defel->defname, "owner") == 0)
{
if (downer)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
downer = defel;
}
else if (strcmp(defel->defname, "template") == 0)
{
if (dtemplate)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dtemplate = defel;
}
else if (strcmp(defel->defname, "encoding") == 0)
{
if (dencoding)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
dencoding = defel;
}
else if (strcmp(defel->defname, "location") == 0)
{
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("LOCATION is not supported anymore"),
errhint("Consider using tablespaces instead.")));
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (downer && downer->arg)
dbowner = strVal(downer->arg);
if (dtemplate && dtemplate->arg)
dbtemplate = strVal(dtemplate->arg);
if (dencoding && dencoding->arg)
{
const char *encoding_name;
if (IsA(dencoding->arg, Integer))
{
encoding = intVal(dencoding->arg);
encoding_name = pg_encoding_to_char(encoding);
if (strcmp(encoding_name, "") == 0 ||
pg_valid_server_encoding(encoding_name) < 0)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("%d is not a valid encoding code",
encoding)));
}
else if (IsA(dencoding->arg, String))
{
encoding_name = strVal(dencoding->arg);
if (pg_valid_server_encoding(encoding_name) < 0)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("%s is not a valid encoding name",
encoding_name)));
encoding = pg_char_to_encoding(encoding_name);
}
else
elog(ERROR, "unrecognized node type: %d",
nodeTag(dencoding->arg));
}
/* obtain sysid of proposed owner */
if (dbowner)
datdba = get_usesysid(dbowner); /* will ereport if no such user */
else
datdba = GetUserId();
if (datdba == GetUserId())
{
/* creating database for self: can be superuser or createdb */
if (!superuser() && !have_createdb_privilege())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to create database")));
}
else
{
/* creating database for someone else: must be superuser */
/* note that the someone else need not have any permissions */
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create database for another user")));
}
/*
2001-03-22 05:01:46 +01:00
* Check for db name conflict. There is a race condition here, since
* another backend could create the same DB name before we commit.
2001-03-22 05:01:46 +01:00
* However, holding an exclusive lock on pg_database for the whole
* time we are copying the source database doesn't seem like a good
* idea, so accept possibility of race to create. We will check again
* after we grab the exclusive lock.
*/
if (get_db_info(dbname, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE),
errmsg("database \"%s\" already exists", dbname)));
/*
* Lookup database (template) to be cloned.
*/
if (!dbtemplate)
2001-03-22 05:01:46 +01:00
dbtemplate = "template1"; /* Default template database name */
if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
&src_istemplate, &src_allowconn, &src_lastsysoid,
&src_vacuumxid, &src_frozenxid, &src_deftablespace))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
2004-08-29 07:07:03 +02:00
errmsg("template database \"%s\" does not exist", dbtemplate)));
2001-03-22 05:01:46 +01:00
/*
2001-03-22 05:01:46 +01:00
* Permission check: to copy a DB that's not marked datistemplate, you
* must be superuser or the owner thereof.
*/
if (!src_istemplate)
{
2002-09-04 22:31:48 +02:00
if (!superuser() && GetUserId() != src_owner)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to copy database \"%s\"",
dbtemplate)));
}
2001-03-22 05:01:46 +01:00
/*
* The source DB can't have any active backends, except this one
* (exception is to allow CREATE DB while connected to template1).
* Otherwise we might copy inconsistent data. This check is not
* bulletproof, since someone might connect while we are copying...
*/
if (DatabaseHasActiveBackends(src_dboid, true))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
2003-08-04 02:43:34 +02:00
errmsg("source database \"%s\" is being accessed by other users",
dbtemplate)));
/* If encoding is defaulted, use source's encoding */
if (encoding < 0)
encoding = src_encoding;
Commit Karel's patch. ------------------------------------------------------------------- Subject: Re: [PATCHES] encoding names From: Karel Zak <zakkr@zf.jcu.cz> To: Peter Eisentraut <peter_e@gmx.net> Cc: pgsql-patches <pgsql-patches@postgresql.org> Date: Fri, 31 Aug 2001 17:24:38 +0200 On Thu, Aug 30, 2001 at 01:30:40AM +0200, Peter Eisentraut wrote: > > - convert encoding 'name' to 'id' > > I thought we decided not to add functions returning "new" names until we > know exactly what the new names should be, and pending schema Ok, the patch not to add functions. > better > > ...(): encoding name too long Fixed. I found new bug in command/variable.c in parse_client_encoding(), nobody probably never see this error: if (pg_set_client_encoding(encoding)) { elog(ERROR, "Conversion between %s and %s is not supported", value, GetDatabaseEncodingName()); } because pg_set_client_encoding() returns -1 for error and 0 as true. It's fixed too. IMHO it can be apply. Karel PS: * following files are renamed: src/utils/mb/Unicode/KOI8_to_utf8.map --> src/utils/mb/Unicode/koi8r_to_utf8.map src/utils/mb/Unicode/WIN_to_utf8.map --> src/utils/mb/Unicode/win1251_to_utf8.map src/utils/mb/Unicode/utf8_to_KOI8.map --> src/utils/mb/Unicode/utf8_to_koi8r.map src/utils/mb/Unicode/utf8_to_WIN.map --> src/utils/mb/Unicode/utf8_to_win1251.map * new file: src/utils/mb/encname.c * removed file: src/utils/mb/common.c -- Karel Zak <zakkr@zf.jcu.cz> http://home.zf.jcu.cz/~zakkr/ C, PostgreSQL, PHP, WWW, http://docs.linux.cz, http://mape.jcu.cz
2001-09-06 06:57:30 +02:00
/* Some encodings are client only */
if (!PG_VALID_BE_ENCODING(encoding))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("invalid server encoding %d", encoding)));
/* Resolve default tablespace for new database */
if (dtablespacename && dtablespacename->arg)
{
char *tablespacename;
2004-08-29 07:07:03 +02:00
AclResult aclresult;
tablespacename = strVal(dtablespacename->arg);
dst_deftablespace = get_tablespace_oid(tablespacename);
if (!OidIsValid(dst_deftablespace))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("tablespace \"%s\" does not exist",
tablespacename)));
/* check permissions */
2004-08-29 07:07:03 +02:00
aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
ACL_CREATE);
2004-08-29 07:07:03 +02:00
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
tablespacename);
/*
* If we are trying to change the default tablespace of the template,
* we require that the template not have any files in the new default
* tablespace. This is necessary because otherwise the copied
* database would contain pg_class rows that refer to its default
* tablespace both explicitly (by OID) and implicitly (as zero), which
* would cause problems. For example another CREATE DATABASE using
* the copied database as template, and trying to change its default
* tablespace again, would yield outright incorrect results (it would
* improperly move tables to the new default tablespace that should
* stay in the same tablespace).
*/
if (dst_deftablespace != src_deftablespace)
{
char *srcpath;
struct stat st;
srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
if (stat(srcpath, &st) == 0 &&
S_ISDIR(st.st_mode) &&
!directory_is_empty(srcpath))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot assign new default tablespace \"%s\"",
tablespacename),
errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
dbtemplate)));
pfree(srcpath);
}
}
else
{
/* Use template database's default tablespace */
dst_deftablespace = src_deftablespace;
/* Note there is no additional permission check in this path */
}
/*
* Normally we mark the new database with the same datvacuumxid and
* datfrozenxid as the source. However, if the source is not allowing
* connections then we assume it is fully frozen, and we can set the
* current transaction ID as the xid limits. This avoids immediately
* starting to generate warnings after cloning template0.
*/
if (!src_allowconn)
src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
2001-03-22 05:01:46 +01:00
/*
* Preassign OID for pg_database tuple, so that we can compute db
* path.
*/
dboid = newoid();
/*
* Force dirty buffers out to disk, to ensure source database is
2001-03-22 05:01:46 +01:00
* up-to-date for the copy. (We really only need to flush buffers for
* the source database, but bufmgr.c provides no API for that.)
*/
BufferSync();
/*
* Close virtual file descriptors so the kernel has more available for
* the system() calls below.
*/
closeAllVfds();
/*
2004-08-29 07:07:03 +02:00
* Iterate through all tablespaces of the template database, and copy
* each one to the new database.
*/
rel = heap_openr(TableSpaceRelationName, AccessShareLock);
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
2004-08-29 07:07:03 +02:00
Oid srctablespace = HeapTupleGetOid(tuple);
Oid dsttablespace;
char *srcpath;
char *dstpath;
struct stat st;
/* No need to copy global tablespace */
if (srctablespace == GLOBALTABLESPACE_OID)
continue;
srcpath = GetDatabasePath(src_dboid, srctablespace);
if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
directory_is_empty(srcpath))
{
/* Assume we can ignore it */
pfree(srcpath);
continue;
}
if (srctablespace == src_deftablespace)
dsttablespace = dst_deftablespace;
else
dsttablespace = srctablespace;
dstpath = GetDatabasePath(dboid, dsttablespace);
if (stat(dstpath, &st) == 0 || errno != ENOENT)
{
remove_dbtablespaces(dboid);
ereport(ERROR,
(errmsg("could not initialize database directory"),
errdetail("Directory \"%s\" already exists.",
dstpath)));
}
2003-04-04 22:40:45 +02:00
#ifndef WIN32
2004-08-29 07:07:03 +02:00
/*
* Copy this subdirectory to the new location
*
* XXX use of cp really makes this code pretty grotty, particularly
* with respect to lack of ability to report errors well. Someday
* rewrite to do it for ourselves.
*/
/* We might need to use cp -R one day for portability */
snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
srcpath, dstpath);
if (system(buf) != 0)
{
remove_dbtablespaces(dboid);
ereport(ERROR,
(errmsg("could not initialize database directory"),
errdetail("Failing system command was: %s", buf),
errhint("Look in the postmaster's stderr log for more information.")));
}
2004-08-29 07:07:03 +02:00
#else /* WIN32 */
if (copydir(srcpath, dstpath) != 0)
{
/* copydir should already have given details of its troubles */
remove_dbtablespaces(dboid);
ereport(ERROR,
(errmsg("could not initialize database directory")));
}
2004-08-29 07:07:03 +02:00
#endif /* WIN32 */
/* Record the filesystem change in XLOG */
{
xl_dbase_create_rec xlrec;
XLogRecData rdata[1];
xlrec.db_id = dboid;
xlrec.tablespace_id = dsttablespace;
xlrec.src_db_id = src_dboid;
xlrec.src_tablespace_id = srctablespace;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_dbase_create_rec);
rdata[0].next = NULL;
(void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
}
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
/*
* Now OK to grab exclusive lock on pg_database.
*/
pg_database_rel = heap_openr(DatabaseRelationName, ExclusiveLock);
/* Check to see if someone else created same DB name meanwhile. */
if (get_db_info(dbname, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL))
{
/* Don't hold lock while doing recursive remove */
heap_close(pg_database_rel, ExclusiveLock);
remove_dbtablespaces(dboid);
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE),
errmsg("database \"%s\" already exists", dbname)));
}
/*
* Insert a new tuple into pg_database
*/
pg_database_dsc = RelationGetDescr(pg_database_rel);
/* Form tuple */
MemSet(new_record, 0, sizeof(new_record));
MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
new_record[Anum_pg_database_datname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(dbname));
new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
2002-09-04 22:31:48 +02:00
/*
* We deliberately set datconfig and datacl to defaults (NULL), rather
* than copying them from the template database. Copying datacl would
2002-09-04 22:31:48 +02:00
* be a bad idea when the owner is not the same as the template's
* owner. It's more debatable whether datconfig should be copied.
*/
new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID
2001-03-22 05:01:46 +01:00
* selection */
simple_heap_insert(pg_database_rel, tuple);
/* Update indexes */
CatalogUpdateIndexes(pg_database_rel, tuple);
/* Close pg_database, but keep exclusive lock till commit */
heap_close(pg_database_rel, NoLock);
/*
* Set flag to update flat database file at commit.
*/
database_file_update_needed();
}
/*
* DROP DATABASE
*/
void
dropdb(const char *dbname)
{
int4 db_owner;
bool db_istemplate;
Oid db_id;
Relation pgdbrel;
2003-08-04 02:43:34 +02:00
SysScanDesc pgdbscan;
ScanKeyData key;
HeapTuple tup;
PreventTransactionChain((void *) dbname, "DROP DATABASE");
AssertArg(dbname);
2003-06-27 16:45:32 +02:00
if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("cannot drop the currently open database")));
/*
* Obtain exclusive lock on pg_database. We need this to ensure that
* no new backend starts up in the target database while we are
* deleting it. (Actually, a new backend might still manage to start
* up, because it isn't able to lock pg_database while starting. But
* it will detect its error in ReverifyMyDatabase and shut down before
* any serious damage is done. See postinit.c.)
*
* An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
* since ReverifyMyDatabase takes RowShareLock. This allows ordinary
* readers of pg_database to proceed in parallel.
*/
pgdbrel = heap_openr(DatabaseRelationName, ExclusiveLock);
if (!get_db_info(dbname, &db_id, &db_owner, NULL,
&db_istemplate, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
if (GetUserId() != db_owner && !superuser())
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
dbname);
/*
2001-03-22 05:01:46 +01:00
* Disallow dropping a DB that is marked istemplate. This is just to
* prevent people from accidentally dropping template0 or template1;
* they can do so if they're really determined ...
*/
if (db_istemplate)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot drop a template database")));
/*
* Check for active backends in the target database.
*/
if (DatabaseHasActiveBackends(db_id, false))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
2003-08-04 02:43:34 +02:00
errmsg("database \"%s\" is being accessed by other users",
dbname)));
/*
Clean up various to-do items associated with system indexes: pg_database now has unique indexes on oid and on datname. pg_shadow now has unique indexes on usename and on usesysid. pg_am now has unique index on oid. pg_opclass now has unique index on oid. pg_amproc now has unique index on amid+amopclaid+amprocnum. Remove pg_rewrite's unnecessary index on oid, delete unused RULEOID syscache. Remove index on pg_listener and associated syscache for performance reasons (caching rows that are certain to change before you need 'em again is rather pointless). Change pg_attrdef's nonunique index on adrelid into a unique index on adrelid+adnum. Fix various incorrect settings of pg_class.relisshared, make that the primary reference point for whether a relation is shared or not. IsSharedSystemRelationName() is now only consulted to initialize relisshared during initial creation of tables and indexes. In theory we might now support shared user relations, though it's not clear how one would get entries for them into pg_class &etc of multiple databases. Fix recently reported bug that pg_attribute rows created for an index all have the same OID. (Proof that non-unique OID doesn't matter unless it's actually used to do lookups ;-)) There's no need to treat pg_trigger, pg_attrdef, pg_relcheck as bootstrap relations. Convert them into plain system catalogs without hardwired entries in pg_class and friends. Unify global.bki and template1.bki into a single init script postgres.bki, since the alleged distinction between them was misleading and pointless. Not to mention that it didn't work for setting up indexes on shared system relations. Rationalize locking of pg_shadow, pg_group, pg_attrdef (no need to use AccessExclusiveLock where ExclusiveLock or even RowExclusiveLock will do). Also, hold locks until transaction commit where necessary.
2001-06-12 07:55:50 +02:00
* Find the database's tuple by OID (should be unique).
*/
ScanKeyInit(&key,
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(db_id));
pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
SnapshotNow, 1, &key);
2003-06-27 16:45:32 +02:00
tup = systable_getnext(pgdbscan);
if (!HeapTupleIsValid(tup))
{
/*
* This error should never come up since the existence of the
* database is checked earlier
*/
elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
dbname);
}
/* Remove the database's tuple from pg_database */
simple_heap_delete(pgdbrel, &tup->t_self);
2003-06-27 16:45:32 +02:00
systable_endscan(pgdbscan);
/*
* Delete any comments associated with the database
*
* NOTE: this is probably dead code since any such comments should have
* been in that database, not mine.
*/
DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
/*
* Drop pages for this database that are in the shared buffer cache.
* This is important to ensure that no remaining backend tries to
* write out a dirty buffer to the dead database later...
*/
DropBuffers(db_id);
/*
* Also, clean out any entries in the shared free space map.
*/
FreeSpaceMapForgetDatabase(db_id);
/*
* On Windows, force a checkpoint so that the bgwriter doesn't hold any
* open files, which would cause rmdir() to fail.
*/
#ifdef WIN32
RequestCheckpoint(true);
#endif
/*
* Remove all tablespace subdirs belonging to the database.
*/
remove_dbtablespaces(db_id);
/* Close pg_database, but keep exclusive lock till commit */
heap_close(pgdbrel, NoLock);
/*
* Set flag to update flat database file at commit.
*/
database_file_update_needed();
}
2003-06-27 16:45:32 +02:00
/*
* Rename database
*/
void
RenameDatabase(const char *oldname, const char *newname)
{
2003-08-04 02:43:34 +02:00
HeapTuple tup,
newtup;
2003-06-27 16:45:32 +02:00
Relation rel;
2003-08-04 02:43:34 +02:00
SysScanDesc scan,
scan2;
ScanKeyData key,
key2;
2003-06-27 16:45:32 +02:00
/*
* Obtain ExclusiveLock so that no new session gets started
2003-06-27 16:45:32 +02:00
* while the rename is in progress.
*/
rel = heap_openr(DatabaseRelationName, ExclusiveLock);
2003-06-27 16:45:32 +02:00
ScanKeyInit(&key,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(oldname));
scan = systable_beginscan(rel, DatabaseNameIndex, true,
SnapshotNow, 1, &key);
2003-06-27 16:45:32 +02:00
tup = systable_getnext(scan);
if (!HeapTupleIsValid(tup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
2003-06-27 16:45:32 +02:00
errmsg("database \"%s\" does not exist", oldname)));
/*
* XXX Client applications probably store the current database
* somewhere, so renaming it could cause confusion. On the other
* hand, there may not be an actual problem besides a little
* confusion, so think about this and decide.
*/
if (HeapTupleGetOid(tup) == MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2003-06-27 16:45:32 +02:00
errmsg("current database may not be renamed")));
/*
2003-08-04 02:43:34 +02:00
* Make sure the database does not have active sessions. Might not be
* necessary, but it's consistent with other database operations.
2003-06-27 16:45:32 +02:00
*/
if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
2003-08-04 02:43:34 +02:00
errmsg("database \"%s\" is being accessed by other users",
oldname)));
2003-06-27 16:45:32 +02:00
/* make sure the new name doesn't exist */
ScanKeyInit(&key2,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(newname));
scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
SnapshotNow, 1, &key2);
2003-06-27 16:45:32 +02:00
if (HeapTupleIsValid(systable_getnext(scan2)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_DATABASE),
2003-06-27 16:45:32 +02:00
errmsg("database \"%s\" already exists", newname)));
systable_endscan(scan2);
/* must be owner */
if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
oldname);
2003-06-27 16:45:32 +02:00
/* must have createdb rights */
if (!superuser() && !have_createdb_privilege())
2003-06-27 16:45:32 +02:00
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to rename database")));
2003-06-27 16:45:32 +02:00
/* rename */
newtup = heap_copytuple(tup);
namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
simple_heap_update(rel, &newtup->t_self, newtup);
2003-06-27 16:45:32 +02:00
CatalogUpdateIndexes(rel, newtup);
systable_endscan(scan);
/* Close pg_database, but keep exclusive lock till commit */
heap_close(rel, NoLock);
/*
* Set flag to update flat database file at commit.
*/
database_file_update_needed();
2003-06-27 16:45:32 +02:00
}
/*
* ALTER DATABASE name SET ...
*/
void
AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
{
char *valuestr;
HeapTuple tuple,
newtuple;
Relation rel;
2002-09-04 22:31:48 +02:00
ScanKeyData scankey;
2003-08-04 02:43:34 +02:00
SysScanDesc scan;
Datum repl_val[Natts_pg_database];
char repl_null[Natts_pg_database];
char repl_repl[Natts_pg_database];
valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
/*
* We don't need ExclusiveLock since we aren't updating the
* flat file.
*/
rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
ScanKeyInit(&scankey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(stmt->dbname));
scan = systable_beginscan(rel, DatabaseNameIndex, true,
SnapshotNow, 1, &scankey);
2003-06-27 16:45:32 +02:00
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", stmt->dbname)));
if (!(superuser()
2003-08-04 02:43:34 +02:00
|| ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
2003-08-04 02:43:34 +02:00
stmt->dbname);
MemSet(repl_repl, ' ', sizeof(repl_repl));
2002-09-04 22:31:48 +02:00
repl_repl[Anum_pg_database_datconfig - 1] = 'r';
2002-09-04 22:31:48 +02:00
if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
{
/* RESET ALL */
2002-09-04 22:31:48 +02:00
repl_null[Anum_pg_database_datconfig - 1] = 'n';
repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
}
else
{
2002-09-04 22:31:48 +02:00
Datum datum;
bool isnull;
ArrayType *a;
2002-09-04 22:31:48 +02:00
repl_null[Anum_pg_database_datconfig - 1] = ' ';
datum = heap_getattr(tuple, Anum_pg_database_datconfig,
RelationGetDescr(rel), &isnull);
a = isnull ? NULL : DatumGetArrayTypeP(datum);
if (valuestr)
a = GUCArrayAdd(a, stmt->variable, valuestr);
else
a = GUCArrayDelete(a, stmt->variable);
if (a)
repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
else
repl_null[Anum_pg_database_datconfig - 1] = 'n';
}
newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
simple_heap_update(rel, &tuple->t_self, newtuple);
/* Update indexes */
CatalogUpdateIndexes(rel, newtuple);
2003-06-27 16:45:32 +02:00
systable_endscan(scan);
/* Close pg_database, but keep lock till commit */
heap_close(rel, NoLock);
/*
* We don't bother updating the flat file since ALTER DATABASE SET
* doesn't affect it.
*/
}
/*
* ALTER DATABASE name OWNER TO newowner
*/
void
AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
{
HeapTuple tuple;
Relation rel;
ScanKeyData scankey;
SysScanDesc scan;
2004-08-29 07:07:03 +02:00
Form_pg_database datForm;
/*
* We don't need ExclusiveLock since we aren't updating the
* flat file.
*/
rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
ScanKeyInit(&scankey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(dbname));
scan = systable_beginscan(rel, DatabaseNameIndex, true,
SnapshotNow, 1, &scankey);
tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
datForm = (Form_pg_database) GETSTRUCT(tuple);
2004-08-29 07:07:03 +02:00
/*
* If the new owner is the same as the existing owner, consider the
2004-08-29 07:07:03 +02:00
* command to have succeeded. This is to be consistent with other
* objects.
*/
if (datForm->datdba != newOwnerSysId)
{
Datum repl_val[Natts_pg_database];
char repl_null[Natts_pg_database];
char repl_repl[Natts_pg_database];
2004-08-29 07:07:03 +02:00
Acl *newAcl;
Datum aclDatum;
bool isNull;
HeapTuple newtuple;
/* must be superuser to change ownership */
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to change owner")));
memset(repl_null, ' ', sizeof(repl_null));
memset(repl_repl, ' ', sizeof(repl_repl));
repl_repl[Anum_pg_database_datdba - 1] = 'r';
repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
/*
* Determine the modified ACL for the new owner. This is only
* necessary when the ACL is non-null.
*/
aclDatum = heap_getattr(tuple,
2004-08-29 07:07:03 +02:00
Anum_pg_database_datacl,
RelationGetDescr(rel),
&isNull);
if (!isNull)
{
newAcl = aclnewowner(DatumGetAclP(aclDatum),
datForm->datdba, newOwnerSysId);
repl_repl[Anum_pg_database_datacl - 1] = 'r';
repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
}
newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
}
systable_endscan(scan);
/* Close pg_database, but keep lock till commit */
heap_close(rel, NoLock);
/*
* We don't bother updating the flat file since ALTER DATABASE OWNER
* doesn't affect it.
*/
}
/*
* Helper functions
*/
static bool
get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
Oid *dbLastSysOidP,
TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
Oid *dbTablespace)
{
Relation relation;
ScanKeyData scanKey;
2003-08-04 02:43:34 +02:00
SysScanDesc scan;
HeapTuple tuple;
bool gottuple;
AssertArg(name);
/* Caller may wish to grab a better lock on pg_database beforehand... */
relation = heap_openr(DatabaseRelationName, AccessShareLock);
ScanKeyInit(&scanKey,
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(name));
scan = systable_beginscan(relation, DatabaseNameIndex, true,
SnapshotNow, 1, &scanKey);
2003-06-27 16:45:32 +02:00
tuple = systable_getnext(scan);
gottuple = HeapTupleIsValid(tuple);
if (gottuple)
{
Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
/* oid of the database */
if (dbIdP)
*dbIdP = HeapTupleGetOid(tuple);
/* sysid of the owner */
if (ownerIdP)
*ownerIdP = dbform->datdba;
/* character encoding */
if (encodingP)
*encodingP = dbform->encoding;
/* allowed as template? */
if (dbIsTemplateP)
*dbIsTemplateP = dbform->datistemplate;
/* allowing connections? */
if (dbAllowConnP)
*dbAllowConnP = dbform->datallowconn;
/* last system OID used in database */
if (dbLastSysOidP)
*dbLastSysOidP = dbform->datlastsysoid;
/* limit of vacuumed XIDs */
if (dbVacuumXidP)
*dbVacuumXidP = dbform->datvacuumxid;
/* limit of frozen XIDs */
if (dbFrozenXidP)
*dbFrozenXidP = dbform->datfrozenxid;
/* default tablespace for this database */
if (dbTablespace)
*dbTablespace = dbform->dattablespace;
}
2003-06-27 16:45:32 +02:00
systable_endscan(scan);
heap_close(relation, AccessShareLock);
return gottuple;
}
/* Check if current user has createdb privileges */
static bool
have_createdb_privilege(void)
{
bool result = false;
HeapTuple utup;
1997-11-07 07:38:51 +01:00
utup = SearchSysCache(SHADOWSYSID,
Int32GetDatum(GetUserId()),
0, 0, 0);
if (HeapTupleIsValid(utup))
{
result = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
ReleaseSysCache(utup);
}
return result;
}
/*
* Remove tablespace directories
*
* We don't know what tablespaces db_id is using, so iterate through all
* tablespaces removing <tablespace>/db_id
*/
static void
remove_dbtablespaces(Oid db_id)
{
2004-08-29 07:07:03 +02:00
Relation rel;
HeapScanDesc scan;
2004-08-29 07:07:03 +02:00
HeapTuple tuple;
rel = heap_openr(TableSpaceRelationName, AccessShareLock);
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
2004-08-29 07:07:03 +02:00
Oid dsttablespace = HeapTupleGetOid(tuple);
char *dstpath;
struct stat st;
/* Don't mess with the global tablespace */
if (dsttablespace == GLOBALTABLESPACE_OID)
continue;
dstpath = GetDatabasePath(db_id, dsttablespace);
if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
{
/* Assume we can ignore it */
pfree(dstpath);
continue;
}
2004-08-01 08:19:26 +02:00
if (!rmtree(dstpath, true))
ereport(WARNING,
2004-08-29 07:07:03 +02:00
(errmsg("could not remove database directory \"%s\"",
dstpath)));
/* Record the filesystem change in XLOG */
{
xl_dbase_drop_rec xlrec;
XLogRecData rdata[1];
xlrec.db_id = db_id;
xlrec.tablespace_id = dsttablespace;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_dbase_drop_rec);
rdata[0].next = NULL;
(void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
}
pfree(dstpath);
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
}
/*
* get_database_oid - given a database name, look up the OID
*
* Returns InvalidOid if database name not found.
*
* This is not actually used in this file, but is exported for use elsewhere.
*/
Oid
get_database_oid(const char *dbname)
{
Relation pg_database;
ScanKeyData entry[1];
2003-08-04 02:43:34 +02:00
SysScanDesc scan;
HeapTuple dbtuple;
Oid oid;
/* There's no syscache for pg_database, so must look the hard way */
pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
ScanKeyInit(&entry[0],
Anum_pg_database_datname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(dbname));
scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
SnapshotNow, 1, entry);
2003-06-27 16:45:32 +02:00
dbtuple = systable_getnext(scan);
/* We assume that there can be at most one matching tuple */
if (HeapTupleIsValid(dbtuple))
oid = HeapTupleGetOid(dbtuple);
else
oid = InvalidOid;
2003-06-27 16:45:32 +02:00
systable_endscan(scan);
heap_close(pg_database, AccessShareLock);
return oid;
}
2003-06-27 16:45:32 +02:00
/*
2003-06-27 16:45:32 +02:00
* get_database_name - given a database OID, look up the name
*
* Returns a palloc'd string, or NULL if no such database.
*
* This is not actually used in this file, but is exported for use elsewhere.
*/
2003-06-27 16:45:32 +02:00
char *
get_database_name(Oid dbid)
{
Relation pg_database;
ScanKeyData entry[1];
2003-08-04 02:43:34 +02:00
SysScanDesc scan;
HeapTuple dbtuple;
2003-06-27 16:45:32 +02:00
char *result;
/* There's no syscache for pg_database, so must look the hard way */
pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
ScanKeyInit(&entry[0],
ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(dbid));
scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
SnapshotNow, 1, entry);
2003-06-27 16:45:32 +02:00
dbtuple = systable_getnext(scan);
2003-06-27 16:45:32 +02:00
/* We assume that there can be at most one matching tuple */
if (HeapTupleIsValid(dbtuple))
result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
else
result = NULL;
2003-06-27 16:45:32 +02:00
systable_endscan(scan);
heap_close(pg_database, AccessShareLock);
2003-06-27 16:45:32 +02:00
return result;
}
/*
* DATABASE resource manager's routines
*/
void
dbase_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_DBASE_CREATE)
{
xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
char *src_path;
char *dst_path;
struct stat st;
#ifndef WIN32
char buf[2 * MAXPGPATH + 100];
#endif
src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
/*
* Our theory for replaying a CREATE is to forcibly drop the
* target subdirectory if present, then re-copy the source data.
* This may be more work than needed, but it is simple to
* implement.
*/
if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
{
if (!rmtree(dst_path, true))
ereport(WARNING,
(errmsg("could not remove database directory \"%s\"",
dst_path)));
}
/*
* Force dirty buffers out to disk, to ensure source database is
* up-to-date for the copy. (We really only need to flush buffers for
* the source database, but bufmgr.c provides no API for that.)
*/
BufferSync();
#ifndef WIN32
/*
* Copy this subdirectory to the new location
*
* XXX use of cp really makes this code pretty grotty, particularly
* with respect to lack of ability to report errors well. Someday
* rewrite to do it for ourselves.
*/
/* We might need to use cp -R one day for portability */
snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
src_path, dst_path);
if (system(buf) != 0)
ereport(ERROR,
(errmsg("could not initialize database directory"),
errdetail("Failing system command was: %s", buf),
errhint("Look in the postmaster's stderr log for more information.")));
#else /* WIN32 */
if (copydir(src_path, dst_path) != 0)
{
/* copydir should already have given details of its troubles */
ereport(ERROR,
(errmsg("could not initialize database directory")));
}
#endif /* WIN32 */
}
else if (info == XLOG_DBASE_DROP)
{
xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
char *dst_path;
dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
/*
* Drop pages for this database that are in the shared buffer
* cache
*/
DropBuffers(xlrec->db_id);
if (!rmtree(dst_path, true))
ereport(WARNING,
(errmsg("could not remove database directory \"%s\"",
dst_path)));
}
else if (info == XLOG_DBASE_CREATE_OLD)
{
xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) XLogRecGetData(record);
char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
struct stat st;
#ifndef WIN32
char buf[2 * MAXPGPATH + 100];
#endif
/*
* Our theory for replaying a CREATE is to forcibly drop the
* target subdirectory if present, then re-copy the source data.
* This may be more work than needed, but it is simple to
* implement.
*/
if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
{
if (!rmtree(dst_path, true))
ereport(WARNING,
(errmsg("could not remove database directory \"%s\"",
dst_path)));
}
/*
* Force dirty buffers out to disk, to ensure source database is
* up-to-date for the copy. (We really only need to flush buffers for
* the source database, but bufmgr.c provides no API for that.)
*/
BufferSync();
#ifndef WIN32
/*
* Copy this subdirectory to the new location
*
* XXX use of cp really makes this code pretty grotty, particularly
* with respect to lack of ability to report errors well. Someday
* rewrite to do it for ourselves.
*/
/* We might need to use cp -R one day for portability */
snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
xlrec->src_path, dst_path);
if (system(buf) != 0)
ereport(ERROR,
(errmsg("could not initialize database directory"),
errdetail("Failing system command was: %s", buf),
errhint("Look in the postmaster's stderr log for more information.")));
#else /* WIN32 */
if (copydir(xlrec->src_path, dst_path) != 0)
{
/* copydir should already have given details of its troubles */
ereport(ERROR,
(errmsg("could not initialize database directory")));
}
#endif /* WIN32 */
}
else if (info == XLOG_DBASE_DROP_OLD)
{
xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) XLogRecGetData(record);
/*
* Drop pages for this database that are in the shared buffer
* cache
*/
DropBuffers(xlrec->db_id);
if (!rmtree(xlrec->dir_path, true))
ereport(WARNING,
(errmsg("could not remove database directory \"%s\"",
xlrec->dir_path)));
}
else
elog(PANIC, "dbase_redo: unknown op code %u", info);
}
void
dbase_undo(XLogRecPtr lsn, XLogRecord *record)
{
elog(PANIC, "dbase_undo: unimplemented");
}
void
dbase_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_DBASE_CREATE)
{
xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
xlrec->src_db_id, xlrec->src_tablespace_id,
xlrec->db_id, xlrec->tablespace_id);
}
else if (info == XLOG_DBASE_DROP)
{
xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
sprintf(buf + strlen(buf), "drop db: dir %u/%u",
xlrec->db_id, xlrec->tablespace_id);
}
else if (info == XLOG_DBASE_CREATE_OLD)
{
xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) rec;
char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
xlrec->db_id, xlrec->src_path, dst_path);
}
else if (info == XLOG_DBASE_DROP_OLD)
{
xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) rec;
sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
xlrec->db_id, xlrec->dir_path);
}
else
strcat(buf, "UNKNOWN");
}