From a03c0d93d5a2730f25a3b2738300dacd8ca5e24a Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 30 Dec 2002 18:42:17 +0000 Subject: [PATCH] Code review for CLUSTER ALL patch. Fix bogus locking, incorrect transaction stop/start nesting, other infelicities. --- doc/src/sgml/ref/cluster.sgml | 24 +- src/backend/commands/cluster.c | 504 +++++++++++++------------- src/backend/commands/tablecmds.c | 17 +- src/backend/tcop/utility.c | 14 +- src/include/commands/cluster.h | 15 +- src/test/regress/expected/cluster.out | 2 +- 6 files changed, 287 insertions(+), 289 deletions(-) diff --git a/doc/src/sgml/ref/cluster.sgml b/doc/src/sgml/ref/cluster.sgml index e72a8f61d1..95daa8bf17 100644 --- a/doc/src/sgml/ref/cluster.sgml +++ b/doc/src/sgml/ref/cluster.sgml @@ -1,5 +1,5 @@ @@ -108,14 +108,16 @@ CLUSTER When a table is clustered, PostgreSQL - remembers on which index it was clustered. In calls to + remembers on which index it was clustered. The form CLUSTER tablename, - the table is clustered on the same index that it was clustered before. + re-clusters the table on the same index that it was clustered before. - A simple CLUSTER clusters all the tables in the database - that the calling user owns and uses the saved cluster information. This + CLUSTER without any parameter re-clusters all the tables + in the + current database that the calling user owns, or all tables if called + by a superuser. (Never-clustered tables are not touched.) This form of CLUSTER cannot be called from inside a transaction or function. @@ -157,15 +159,15 @@ CLUSTER - CLUSTER preserves GRANT, inheritance, index, foreign - key, and other ancillary information about the table. + CLUSTER preserves GRANT, inheritance, index, foreign + key, and other ancillary information about the table. - Because CLUSTER remembers the clustering information, - one can cluster the tables one wants clustered manually the first time, and - setup a timed event similar to VACUUM so that the tables - are periodically and automatically clustered. + Because CLUSTER remembers the clustering information, + one can cluster the tables one wants clustered manually the first time, and + setup a timed event similar to VACUUM so that the tables + are periodically re-clustered. diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 05321d353b..0361ede009 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.102 2002/12/06 05:00:10 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.103 2002/12/30 18:42:13 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -36,6 +36,7 @@ #include "utils/syscache.h" #include "utils/relcache.h" + /* * We need one of these structs for each index in the relation to be * clustered. It's basically the data needed by index_create() so @@ -51,7 +52,8 @@ typedef struct bool isclustered; } IndexAttrs; -/* This struct is used to pass around the information on tables to be +/* + * This struct is used to pass around the information on tables to be * clustered. We need this so we can make a list of them when invoked without * a specific table/index pair. */ @@ -59,21 +61,174 @@ typedef struct { Oid tableOid; Oid indexOid; - bool isPrevious; -} relToCluster; +} RelToCluster; + +static void cluster_rel(RelToCluster *rv, bool recheck); static Oid make_new_heap(Oid OIDOldHeap, const char *NewName); static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); -static void recreate_indexattr(Oid OIDOldHeap, List *indexes); +static List *get_indexattr_list(Relation OldHeap, Oid OldIndex); +static void rebuild_indexes(Oid OIDOldHeap, List *indexes); static void swap_relfilenodes(Oid r1, Oid r2); -static void cluster_rel(relToCluster *rv); -static bool check_cluster_ownership(Oid relOid); -static List *get_tables_to_cluster(AclId owner); +static bool check_cluster_permitted(Oid relOid); +static List *get_tables_to_cluster(MemoryContext cluster_context); -static MemoryContext cluster_context = NULL; + + +/*--------------------------------------------------------------------------- + * This cluster code allows for clustering multiple tables at once. Because + * of this, we cannot just run everything on a single transaction, or we + * would be forced to acquire exclusive locks on all the tables being + * clustered, simultaneously --- very likely leading to deadlock. + * + * To solve this we follow a similar strategy to VACUUM code, + * clustering each relation in a separate transaction. For this to work, + * we need to: + * - provide a separate memory context so that we can pass information in + * a way that survives across transactions + * - start a new transaction every time a new relation is clustered + * - check for validity of the information on to-be-clustered relations, + * as someone might have deleted a relation behind our back, or + * clustered one on a different index + * - end the transaction + * + * The single-relation case does not have any such overhead. + * + * We also allow a relation being specified without index. In that case, + * the indisclustered bit will be looked up, and an ERROR will be thrown + * if there is no index with the bit set. + *--------------------------------------------------------------------------- + */ +void +cluster(ClusterStmt *stmt) +{ + if (stmt->relation != NULL) + { + /* This is the single-relation case. */ + Oid tableOid, + indexOid = InvalidOid; + Relation rel; + RelToCluster rvtc; + + /* Find and lock the table */ + tableOid = RangeVarGetRelid(stmt->relation, false); + + rel = heap_open(tableOid, AccessExclusiveLock); + + /* Check permissions */ + if (!check_cluster_permitted(tableOid)) + elog(ERROR, "CLUSTER: You do not own relation %s", + stmt->relation->relname); + + if (stmt->indexname == NULL) + { + List *index; + + /* We need to find the index that has indisclustered set. */ + foreach (index, RelationGetIndexList(rel)) + { + HeapTuple idxtuple; + Form_pg_index indexForm; + + indexOid = lfirsti(index); + idxtuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(indexOid), + 0, 0, 0); + if (!HeapTupleIsValid(idxtuple)) + elog(ERROR, "Cache lookup failed for index %u", + indexOid); + indexForm = (Form_pg_index) GETSTRUCT(idxtuple); + if (indexForm->indisclustered) + { + ReleaseSysCache(idxtuple); + break; + } + ReleaseSysCache(idxtuple); + indexOid = InvalidOid; + } + + if (!OidIsValid(indexOid)) + elog(ERROR, "CLUSTER: No previously clustered index found on table \"%s\"", + stmt->relation->relname); + } + else + { + /* The index is expected to be in the same namespace as the relation. */ + indexOid = get_relname_relid(stmt->indexname, + rel->rd_rel->relnamespace); + if (!OidIsValid(indexOid)) + elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"", + stmt->indexname, stmt->relation->relname); + } + + rvtc.tableOid = tableOid; + rvtc.indexOid = indexOid; + + /* close relation, keep lock till commit */ + heap_close(rel, NoLock); + + /* Do the job */ + cluster_rel(&rvtc, false); + } + else + { + /* + * This is the "multi relation" case. We need to cluster all tables + * that have some index with indisclustered set. + */ + MemoryContext cluster_context; + List *rv, + *rvs; + + /* + * We cannot run this form of CLUSTER inside a user transaction block; + * we'd be holding locks way too long. + */ + PreventTransactionChain((void *) stmt, "CLUSTER"); + + /* + * Create special memory context for cross-transaction storage. + * + * Since it is a child of QueryContext, it will go away even in case + * of error. + */ + cluster_context = AllocSetContextCreate(QueryContext, + "Cluster", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Build the list of relations to cluster. Note that this lives in + * cluster_context. + */ + rvs = get_tables_to_cluster(cluster_context); + + /* Commit to get out of starting transaction */ + CommitTransactionCommand(true); + + /* Ok, now that we've got them all, cluster them one by one */ + foreach (rv, rvs) + { + RelToCluster *rvtc = (RelToCluster *) lfirst(rv); + + /* Start a new transaction for each relation. */ + StartTransactionCommand(true); + SetQuerySnapshot(); /* might be needed for functional index */ + cluster_rel(rvtc, true); + CommitTransactionCommand(true); + } + + /* Start a new transaction for the cleanup work. */ + StartTransactionCommand(true); + + /* Clean up working storage */ + MemoryContextDelete(cluster_context); + } +} /* - * cluster + * cluster_rel * * This clusters the table by creating a new, clustered table and * swapping the relfilenodes of the new table and the old table, so @@ -85,45 +240,52 @@ static MemoryContext cluster_context = NULL; * same way we do for the relation. Since we are effectively bulk-loading * the new table, it's better to create the indexes afterwards than to fill * them incrementally while we load the table. - * - * Since we may open a new transaction for each relation, we have to - * check that the relation still is what we think it is. */ -void -cluster_rel(relToCluster *rvtc) +static void +cluster_rel(RelToCluster *rvtc, bool recheck) { Relation OldHeap, OldIndex; - List *indexes; /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); - /* Check if the relation and index still exist before opening them + /* + * Since we may open a new transaction for each relation, we have to + * check that the relation still is what we think it is. + * + * If this is a single-transaction CLUSTER, we can skip these tests. + * We *must* skip the one on indisclustered since it would reject an + * attempt to cluster a not-previously-clustered index. */ - if (!SearchSysCacheExists(RELOID, - ObjectIdGetDatum(rvtc->tableOid), - 0, 0, 0) || - !SearchSysCacheExists(RELOID, - ObjectIdGetDatum(rvtc->indexOid), - 0, 0, 0)) - return; - - /* Check that the user still owns the relation */ - if (!check_cluster_ownership(rvtc->tableOid)) - return; - - /* Check that the index is still the one with indisclustered set. - * If this is a standalone cluster, skip this test. - */ - if (rvtc->isPrevious) + if (recheck) { HeapTuple tuple; Form_pg_index indexForm; + /* + * Check if the relation and index still exist before opening them + */ + if (!SearchSysCacheExists(RELOID, + ObjectIdGetDatum(rvtc->tableOid), + 0, 0, 0) || + !SearchSysCacheExists(RELOID, + ObjectIdGetDatum(rvtc->indexOid), + 0, 0, 0)) + return; + + /* Check that the user still owns the relation */ + if (!check_cluster_permitted(rvtc->tableOid)) + return; + + /* + * Check that the index is still the one with indisclustered set. + */ tuple = SearchSysCache(INDEXRELID, ObjectIdGetDatum(rvtc->indexOid), 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + return; /* could have gone away... */ indexForm = (Form_pg_index) GETSTRUCT(tuple); if (!indexForm->indisclustered) { @@ -135,7 +297,8 @@ cluster_rel(relToCluster *rvtc) /* * We grab exclusive access to the target rel and index for the - * duration of the transaction. + * duration of the transaction. (This is redundant for the single- + * transaction case, since cluster() already did it.) */ OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock); @@ -162,29 +325,43 @@ cluster_rel(relToCluster *rvtc) elog(ERROR, "CLUSTER: cannot cluster system relation \"%s\"", RelationGetRelationName(OldHeap)); - /* Save the information of all indexes on the relation. */ - indexes = get_indexattr_list(OldHeap, rvtc->indexOid); - - /* Drop relcache refcnts, but do NOT give up the locks */ + /* Drop relcache refcnt on OldIndex, but keep lock */ index_close(OldIndex); - heap_close(OldHeap, NoLock); - /* rebuild_rel does all the dirty work */ - rebuild_rel(rvtc->tableOid, rvtc->indexOid, indexes, true); + /* rebuild_relation does all the dirty work */ + rebuild_relation(OldHeap, rvtc->indexOid); + + /* NB: rebuild_relation does heap_close() on OldHeap */ } +/* + * rebuild_relation: rebuild an existing relation + * + * This is shared code between CLUSTER and TRUNCATE. In the TRUNCATE + * case, the new relation is built and left empty. In the CLUSTER case, + * it is filled with data read from the old relation in the order specified + * by the index. + * + * OldHeap: table to rebuild --- must be opened and exclusive-locked! + * indexOid: index to cluster by, or InvalidOid in TRUNCATE case + * + * NB: this routine closes OldHeap at the right time; caller should not. + */ void -rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) +rebuild_relation(Relation OldHeap, Oid indexOid) { + Oid tableOid = RelationGetRelid(OldHeap); + List *indexes; Oid OIDNewHeap; char NewHeapName[NAMEDATALEN]; ObjectAddress object; - /* - * If dataCopy is true, we assume that we will be basing the - * copy off an index for cluster operations. - */ - Assert(!dataCopy || OidIsValid(indexOid)); + /* Save the information about all indexes on the relation. */ + indexes = get_indexattr_list(OldHeap, indexOid); + + /* Close relcache entry, but keep lock until transaction commit */ + heap_close(OldHeap, NoLock); + /* * Create the new heap, using a temporary name in the same namespace * as the existing table. NOTE: there is some risk of collision with @@ -204,7 +381,7 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) /* * Copy the heap data into the new table in the desired order. */ - if (dataCopy) + if (OidIsValid(indexOid)) copy_heap_data(OIDNewHeap, tableOid, indexOid); /* To make the new heap's data visible (probably not needed?). */ @@ -230,9 +407,9 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) /* * Recreate each index on the relation. We do not need - * CommandCounterIncrement() because recreate_indexattr does it. + * CommandCounterIncrement() because rebuild_indexes does it. */ - recreate_indexattr(tableOid, indexes); + rebuild_indexes(tableOid, indexes); } /* @@ -335,7 +512,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) * Get the necessary info about the indexes of the relation and * return a list of IndexAttrs structures. */ -List * +static List * get_indexattr_list(Relation OldHeap, Oid OldIndex) { List *indexes = NIL; @@ -366,7 +543,8 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); memcpy(attrs->classOID, indexForm->indclass, sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); - attrs->isclustered = (OldIndex == indexOID); + /* We adjust the isclustered attribute to correct new state */ + attrs->isclustered = (indexOID == OldIndex); /* Name and access method of each index come from pg_class */ classTuple = SearchSysCache(RELOID, @@ -397,7 +575,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) * the new index (carrying the old index filenode along). */ static void -recreate_indexattr(Oid OIDOldHeap, List *indexes) +rebuild_indexes(Oid OIDOldHeap, List *indexes) { List *elem; @@ -629,235 +807,69 @@ swap_relfilenodes(Oid r1, Oid r2) heap_close(relRelation, RowExclusiveLock); } -/*--------------------------------------------------------------------------- - * This cluster code allows for clustering multiple tables at once. Because - * of this, we cannot just run everything on a single transaction, or we - * would be forced to acquire exclusive locks on all the tables being - * clustered. To solve this we follow a similar strategy to VACUUM code, - * clustering each relation in a separate transaction. For this to work, - * we need to: - * - provide a separate memory context so that we can pass information in - * a way that trascends transactions - * - start a new transaction every time a new relation is clustered - * - check for validity of the information on to-be-clustered relations, - * as someone might have deleted a relation behind our back, or - * clustered one on a different index - * - end the transaction - * - * The single relation code does not have any overhead. - * - * We also allow a relation being specified without index. In that case, - * the indisclustered bit will be looked up, and an ERROR will be thrown - * if there is no index with the bit set. - *--------------------------------------------------------------------------- +/* + * Checks if the user is allowed to cluster (ie, owns) the relation. + * Superusers are allowed to cluster any table. */ -void -cluster(ClusterStmt *stmt) -{ - - /* This is the single relation case. */ - if (stmt->relation != NULL) - { - Oid indexOid = InvalidOid, - tableOid; - relToCluster rvtc; - HeapTuple tuple; - Form_pg_class classForm; - - tableOid = RangeVarGetRelid(stmt->relation, false); - if (!check_cluster_ownership(tableOid)) - elog(ERROR, "CLUSTER: You do not own relation %s", - stmt->relation->relname); - - tuple = SearchSysCache(RELOID, - ObjectIdGetDatum(tableOid), - 0, 0, 0); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "Cache lookup failed for relation %u", tableOid); - classForm = (Form_pg_class) GETSTRUCT(tuple); - - if (stmt->indexname == NULL) - { - List *index; - Relation rel = RelationIdGetRelation(tableOid); - HeapTuple ituple = NULL, - idxtuple = NULL; - - /* We need to fetch the index that has indisclustered set. */ - foreach (index, RelationGetIndexList(rel)) - { - Form_pg_index indexForm; - - indexOid = lfirsti(index); - ituple = SearchSysCache(RELOID, - ObjectIdGetDatum(indexOid), - 0, 0, 0); - if (!HeapTupleIsValid(ituple)) - elog(ERROR, "Cache lookup failed for relation %u", indexOid); - idxtuple = SearchSysCache(INDEXRELID, - ObjectIdGetDatum(HeapTupleGetOid(ituple)), - 0, 0, 0); - if (!HeapTupleIsValid(idxtuple)) - elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple)); - indexForm = (Form_pg_index) GETSTRUCT(idxtuple); - if (indexForm->indisclustered) - break; - indexOid = InvalidOid; - } - if (indexOid == InvalidOid) - elog(ERROR, "CLUSTER: No previously clustered index found on table %s", - stmt->relation->relname); - RelationClose(rel); - ReleaseSysCache(ituple); - ReleaseSysCache(idxtuple); - } - else - { - /* The index is expected to be in the same namespace as the relation. */ - indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace); - } - ReleaseSysCache(tuple); - - /* XXX Maybe the namespace should be reported as well */ - if (!OidIsValid(indexOid)) - elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"", - stmt->indexname, stmt->relation->relname); - rvtc.tableOid = tableOid; - rvtc.indexOid = indexOid; - rvtc.isPrevious = false; - - /* Do the job */ - cluster_rel(&rvtc); - } - else - { - /* - * This is the "no relation" case. We need to cluster all tables - * that have some index with indisclustered set. - */ - - relToCluster *rvtc; - List *rv, - *rvs; - - /* - * We cannot run CLUSTER inside a user transaction block; if we were inside - * a transaction, then our commit- and start-transaction-command calls - * would not have the intended effect! - */ - if (IsTransactionBlock()) - elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block"); - - /* Running CLUSTER from a function would free the function context */ - if (!MemoryContextContains(QueryContext, stmt)) - elog(ERROR, "CLUSTER cannot be called from a function"); - /* - * Create special memory context for cross-transaction storage. - * - * Since it is a child of QueryContext, it will go away even in case - * of error. - */ - cluster_context = AllocSetContextCreate(QueryContext, - "Cluster", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - /* - * Build the list of relations to cluster. Note that this lives in - * cluster_context. - */ - rvs = get_tables_to_cluster(GetUserId()); - - /* Ok, now that we've got them all, cluster them one by one */ - foreach (rv, rvs) - { - rvtc = (relToCluster *)lfirst(rv); - - /* Start a new transaction for this relation. */ - StartTransactionCommand(true); - cluster_rel(rvtc); - CommitTransactionCommand(true); - } - } - - /* Start a new transaction for the cleanup work. */ - StartTransactionCommand(true); - - /* Clean up working storage */ - if (stmt->relation == NULL) - { - MemoryContextDelete(cluster_context); - cluster_context = NULL; - } -} - -/* Checks if the user owns the relation. Superusers - * are allowed to cluster any table. - */ -bool -check_cluster_ownership(Oid relOid) +static bool +check_cluster_permitted(Oid relOid) { /* Superusers bypass this check */ return pg_class_ownercheck(relOid, GetUserId()); } -/* Get a list of tables that the current user owns and +/* + * Get a list of tables that the current user owns and * have indisclustered set. Return the list in a List * of rvsToCluster * with the tableOid and the indexOid on which the table is already * clustered. */ -List * -get_tables_to_cluster(AclId owner) +static List * +get_tables_to_cluster(MemoryContext cluster_context) { Relation indRelation; HeapScanDesc scan; ScanKeyData entry; HeapTuple indexTuple; Form_pg_index index; - relToCluster *rvtc; + MemoryContext old_context; + RelToCluster *rvtc; List *rvs = NIL; /* - * Get all indexes that have indisclustered set. System - * relations or nailed-in relations cannot ever have - * indisclustered set, because CLUSTER will refuse to - * set it when called with one of them as argument. + * Get all indexes that have indisclustered set and are owned by + * appropriate user. System relations or nailed-in relations cannot ever + * have indisclustered set, because CLUSTER will refuse to set it when + * called with one of them as argument. */ - indRelation = relation_openr(IndexRelationName, RowExclusiveLock); - ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered, - F_BOOLEQ, true); + indRelation = relation_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, + Anum_pg_index_indisclustered, + F_BOOLEQ, + BoolGetDatum(true)); scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry); while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { - MemoryContext old_context = NULL; - index = (Form_pg_index) GETSTRUCT(indexTuple); - if (!check_cluster_ownership(index->indrelid)) + if (!check_cluster_permitted(index->indrelid)) continue; /* - * We have to build the struct in a different memory context so + * We have to build the list in a different memory context so * it will survive the cross-transaction processing */ - old_context = MemoryContextSwitchTo(cluster_context); - rvtc = (relToCluster *)palloc(sizeof(relToCluster)); - rvtc->indexOid = index->indexrelid; + rvtc = (RelToCluster *) palloc(sizeof(RelToCluster)); rvtc->tableOid = index->indrelid; - rvtc->isPrevious = true; - rvs = lcons((void *)rvtc, rvs); + rvtc->indexOid = index->indexrelid; + rvs = lcons(rvtc, rvs); MemoryContextSwitchTo(old_context); } heap_endscan(scan); - /* - * Release the lock on pg_index. We will check the indexes - * later again. - * - */ - relation_close(indRelation, RowExclusiveLock); + relation_close(indRelation, AccessShareLock); + return rvs; } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 2a1a2b4cf4..29edb61638 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.62 2002/12/16 18:39:22 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.63 2002/12/30 18:42:14 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -355,7 +355,7 @@ RemoveRelation(const RangeVar *relation, DropBehavior behavior) * Removes all the rows from a relation. * * Note: This routine only does safety and permissions checks; - * rebuild_rel in cluster.c does the actual work. + * rebuild_relation in cluster.c does the actual work. */ void TruncateRelation(const RangeVar *relation) @@ -366,7 +366,6 @@ TruncateRelation(const RangeVar *relation) Relation fkeyRel; SysScanDesc fkeyScan; HeapTuple tuple; - List *indexes; /* Grab exclusive lock in preparation for truncate */ rel = heap_openrv(relation, AccessExclusiveLock); @@ -433,17 +432,13 @@ TruncateRelation(const RangeVar *relation) systable_endscan(fkeyScan); heap_close(fkeyRel, AccessShareLock); - /* Save the information of all indexes on the relation. */ - indexes = get_indexattr_list(rel, InvalidOid); - - /* Keep the lock until transaction commit */ - heap_close(rel, NoLock); - /* * Do the real work using the same technique as cluster, but - * without the code copy portion + * without the data-copying portion */ - rebuild_rel(relid, InvalidOid, indexes, false); + rebuild_relation(rel, InvalidOid); + + /* NB: rebuild_relation does heap_close() */ } /*---------- diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index cc0f139b2d..962c531150 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.186 2002/12/30 15:31:48 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.187 2002/12/30 18:42:16 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -187,7 +187,6 @@ ProcessUtility(Node *parsetree, CommandDest dest, char *completionTag) { - if (completionTag) completionTag[0] = '\0'; @@ -195,7 +194,6 @@ ProcessUtility(Node *parsetree, { /* * ******************************** transactions ******************************** - * */ case T_TransactionStmt: { @@ -742,11 +740,7 @@ ProcessUtility(Node *parsetree, break; case T_ClusterStmt: - { - ClusterStmt *stmt = (ClusterStmt *) parsetree; - - cluster(stmt); - } + cluster((ClusterStmt *) parsetree); break; case T_VacuumStmt: @@ -874,7 +868,6 @@ ProcessUtility(Node *parsetree, switch (stmt->reindexType) { - char *relname; case INDEX: CheckOwnership(stmt->relation, false); ReindexIndex(stmt->relation, stmt->force); @@ -884,8 +877,7 @@ ProcessUtility(Node *parsetree, ReindexTable(stmt->relation, stmt->force); break; case DATABASE: - relname = (char *) stmt->name; - ReindexDatabase(relname, stmt->force, false); + ReindexDatabase(stmt->name, stmt->force, false); break; } break; diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index c83db66725..7eb11d60b0 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -6,22 +6,19 @@ * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994-5, Regents of the University of California * - * $Id: cluster.h,v 1.17 2002/11/23 04:05:52 momjian Exp $ + * $Id: cluster.h,v 1.18 2002/12/30 18:42:16 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef CLUSTER_H #define CLUSTER_H -#include -/* - * functions - */ +#include "nodes/parsenodes.h" +#include "utils/rel.h" + + extern void cluster(ClusterStmt *stmt); -extern List *get_indexattr_list(Relation OldHeap, Oid OldIndex); -extern void rebuild_rel(Oid tableOid, Oid indexOid, - List *indexes, bool dataCopy); - +extern void rebuild_relation(Relation OldHeap, Oid indexOid); #endif /* CLUSTER_H */ diff --git a/src/test/regress/expected/cluster.out b/src/test/regress/expected/cluster.out index 105b4c8ec2..48daf0fc25 100644 --- a/src/test/regress/expected/cluster.out +++ b/src/test/regress/expected/cluster.out @@ -304,7 +304,7 @@ INSERT INTO clstr_3 VALUES (2); INSERT INTO clstr_3 VALUES (1); -- "CLUSTER " on a table that hasn't been clustered CLUSTER clstr_2; -ERROR: CLUSTER: No previously clustered index found on table clstr_2 +ERROR: CLUSTER: No previously clustered index found on table "clstr_2" CLUSTER clstr_1_pkey ON clstr_1; CLUSTER clstr_2_pkey ON clstr_2; SELECT * FROM clstr_1 UNION ALL