Add support for --jobs in reindexdb
When doing a schema-level or a database-level operation, a list of relations to build is created which gets processed in parallel using multiple connections, based on the recent refactoring for parallel slots in src/bin/scripts/. System catalogs are processed first in a serialized fashion to prevent deadlocks, followed by the rest done in parallel. This new option is not compatible with --system as reindexing system catalogs in parallel can lead to deadlocks, and with --index as there is no conflict handling for indexes rebuilt in parallel depending in the same relation. Author: Julien Rouhaud Reviewed-by: Sergei Kornilov, Michael Paquier Discussion: https://postgr.es/m/CAOBaU_YrnH_Jqo46NhaJ7uRBiWWEcS40VNRQxgFbqYo9kApUsg@mail.gmail.com
This commit is contained in:
parent
4552c0f566
commit
5ab892c391
|
@ -166,6 +166,29 @@ PostgreSQL documentation
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
|
||||||
|
<term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Execute the reindex commands in parallel by running
|
||||||
|
<replaceable class="parameter">njobs</replaceable>
|
||||||
|
commands simultaneously. This option reduces the time of the
|
||||||
|
processing but it also increases the load on the database server.
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
<application>reindexdb</application> will open
|
||||||
|
<replaceable class="parameter">njobs</replaceable> connections to the
|
||||||
|
database, so make sure your <xref linkend="guc-max-connections"/>
|
||||||
|
setting is high enough to accommodate all connections.
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Note that this option is incompatible with the <option>--index</option>
|
||||||
|
and <option>--system</option> options.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
<varlistentry>
|
<varlistentry>
|
||||||
<term><option>-q</option></term>
|
<term><option>-q</option></term>
|
||||||
<term><option>--quiet</option></term>
|
<term><option>--quiet</option></term>
|
||||||
|
|
|
@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
|
||||||
dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
||||||
clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
||||||
vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
||||||
reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
||||||
pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
|
||||||
|
|
||||||
install: all installdirs
|
install: all installdirs
|
||||||
|
|
|
@ -10,10 +10,14 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres_fe.h"
|
#include "postgres_fe.h"
|
||||||
|
|
||||||
|
#include "catalog/pg_class_d.h"
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "common/logging.h"
|
#include "common/logging.h"
|
||||||
|
#include "fe_utils/connect.h"
|
||||||
#include "fe_utils/simple_list.h"
|
#include "fe_utils/simple_list.h"
|
||||||
#include "fe_utils/string_utils.h"
|
#include "fe_utils/string_utils.h"
|
||||||
|
#include "scripts_parallel.h"
|
||||||
|
|
||||||
typedef enum ReindexType
|
typedef enum ReindexType
|
||||||
{
|
{
|
||||||
|
@ -25,16 +29,26 @@ typedef enum ReindexType
|
||||||
} ReindexType;
|
} ReindexType;
|
||||||
|
|
||||||
|
|
||||||
static void reindex_one_database(const char *name, const char *dbname,
|
static SimpleStringList *get_parallel_object_list(PGconn *conn,
|
||||||
ReindexType type, const char *host,
|
ReindexType type,
|
||||||
|
SimpleStringList *user_list,
|
||||||
|
bool echo);
|
||||||
|
static void reindex_one_database(const char *dbname, ReindexType type,
|
||||||
|
SimpleStringList *user_list, const char *host,
|
||||||
const char *port, const char *username,
|
const char *port, const char *username,
|
||||||
enum trivalue prompt_password, const char *progname,
|
enum trivalue prompt_password, const char *progname,
|
||||||
bool echo, bool verbose, bool concurrently);
|
bool echo, bool verbose, bool concurrently,
|
||||||
|
int concurrentCons);
|
||||||
static void reindex_all_databases(const char *maintenance_db,
|
static void reindex_all_databases(const char *maintenance_db,
|
||||||
const char *host, const char *port,
|
const char *host, const char *port,
|
||||||
const char *username, enum trivalue prompt_password,
|
const char *username, enum trivalue prompt_password,
|
||||||
const char *progname, bool echo,
|
const char *progname, bool echo,
|
||||||
bool quiet, bool verbose, bool concurrently);
|
bool quiet, bool verbose, bool concurrently,
|
||||||
|
int concurrentCons);
|
||||||
|
static void run_reindex_command(PGconn *conn, ReindexType type,
|
||||||
|
const char *name, bool echo, bool verbose,
|
||||||
|
bool concurrently, bool async);
|
||||||
|
|
||||||
static void help(const char *progname);
|
static void help(const char *progname);
|
||||||
|
|
||||||
int
|
int
|
||||||
|
@ -54,6 +68,7 @@ main(int argc, char *argv[])
|
||||||
{"system", no_argument, NULL, 's'},
|
{"system", no_argument, NULL, 's'},
|
||||||
{"table", required_argument, NULL, 't'},
|
{"table", required_argument, NULL, 't'},
|
||||||
{"index", required_argument, NULL, 'i'},
|
{"index", required_argument, NULL, 'i'},
|
||||||
|
{"jobs", required_argument, NULL, 'j'},
|
||||||
{"verbose", no_argument, NULL, 'v'},
|
{"verbose", no_argument, NULL, 'v'},
|
||||||
{"concurrently", no_argument, NULL, 1},
|
{"concurrently", no_argument, NULL, 1},
|
||||||
{"maintenance-db", required_argument, NULL, 2},
|
{"maintenance-db", required_argument, NULL, 2},
|
||||||
|
@ -79,6 +94,7 @@ main(int argc, char *argv[])
|
||||||
SimpleStringList indexes = {NULL, NULL};
|
SimpleStringList indexes = {NULL, NULL};
|
||||||
SimpleStringList tables = {NULL, NULL};
|
SimpleStringList tables = {NULL, NULL};
|
||||||
SimpleStringList schemas = {NULL, NULL};
|
SimpleStringList schemas = {NULL, NULL};
|
||||||
|
int concurrentCons = 1;
|
||||||
|
|
||||||
pg_logging_init(argv[0]);
|
pg_logging_init(argv[0]);
|
||||||
progname = get_progname(argv[0]);
|
progname = get_progname(argv[0]);
|
||||||
|
@ -87,7 +103,7 @@ main(int argc, char *argv[])
|
||||||
handle_help_version_opts(argc, argv, "reindexdb", help);
|
handle_help_version_opts(argc, argv, "reindexdb", help);
|
||||||
|
|
||||||
/* process command-line options */
|
/* process command-line options */
|
||||||
while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
|
while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
|
||||||
{
|
{
|
||||||
switch (c)
|
switch (c)
|
||||||
{
|
{
|
||||||
|
@ -130,6 +146,20 @@ main(int argc, char *argv[])
|
||||||
case 'i':
|
case 'i':
|
||||||
simple_string_list_append(&indexes, optarg);
|
simple_string_list_append(&indexes, optarg);
|
||||||
break;
|
break;
|
||||||
|
case 'j':
|
||||||
|
concurrentCons = atoi(optarg);
|
||||||
|
if (concurrentCons <= 0)
|
||||||
|
{
|
||||||
|
pg_log_error("number of parallel jobs must be at least 1");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
if (concurrentCons > FD_SETSIZE - 1)
|
||||||
|
{
|
||||||
|
pg_log_error("too many parallel jobs requested (maximum: %d)",
|
||||||
|
FD_SETSIZE - 1);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
break;
|
||||||
case 'v':
|
case 'v':
|
||||||
verbose = true;
|
verbose = true;
|
||||||
break;
|
break;
|
||||||
|
@ -194,7 +224,8 @@ main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
|
|
||||||
reindex_all_databases(maintenance_db, host, port, username,
|
reindex_all_databases(maintenance_db, host, port, username,
|
||||||
prompt_password, progname, echo, quiet, verbose, concurrently);
|
prompt_password, progname, echo, quiet, verbose,
|
||||||
|
concurrently, concurrentCons);
|
||||||
}
|
}
|
||||||
else if (syscatalog)
|
else if (syscatalog)
|
||||||
{
|
{
|
||||||
|
@ -214,6 +245,12 @@ main(int argc, char *argv[])
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (concurrentCons > 1)
|
||||||
|
{
|
||||||
|
pg_log_error("cannot use multiple jobs to reindex system catalogs");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
if (dbname == NULL)
|
if (dbname == NULL)
|
||||||
{
|
{
|
||||||
if (getenv("PGDATABASE"))
|
if (getenv("PGDATABASE"))
|
||||||
|
@ -224,12 +261,23 @@ main(int argc, char *argv[])
|
||||||
dbname = get_user_name_or_exit(progname);
|
dbname = get_user_name_or_exit(progname);
|
||||||
}
|
}
|
||||||
|
|
||||||
reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
|
reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
|
||||||
port, username, prompt_password, progname,
|
port, username, prompt_password, progname,
|
||||||
echo, verbose, concurrently);
|
echo, verbose, concurrently, 1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Index-level REINDEX is not supported with multiple jobs as we
|
||||||
|
* cannot control the concurrent processing of multiple indexes
|
||||||
|
* depending on the same relation.
|
||||||
|
*/
|
||||||
|
if (concurrentCons > 1 && indexes.head != NULL)
|
||||||
|
{
|
||||||
|
pg_log_error("cannot use multiple jobs to reindex indexes");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
if (dbname == NULL)
|
if (dbname == NULL)
|
||||||
{
|
{
|
||||||
if (getenv("PGDATABASE"))
|
if (getenv("PGDATABASE"))
|
||||||
|
@ -241,61 +289,49 @@ main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schemas.head != NULL)
|
if (schemas.head != NULL)
|
||||||
{
|
reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
|
||||||
SimpleStringListCell *cell;
|
|
||||||
|
|
||||||
for (cell = schemas.head; cell; cell = cell->next)
|
|
||||||
{
|
|
||||||
reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
|
|
||||||
port, username, prompt_password, progname,
|
port, username, prompt_password, progname,
|
||||||
echo, verbose, concurrently);
|
echo, verbose, concurrently, concurrentCons);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (indexes.head != NULL)
|
if (indexes.head != NULL)
|
||||||
{
|
reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
|
||||||
SimpleStringListCell *cell;
|
|
||||||
|
|
||||||
for (cell = indexes.head; cell; cell = cell->next)
|
|
||||||
{
|
|
||||||
reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
|
|
||||||
port, username, prompt_password, progname,
|
port, username, prompt_password, progname,
|
||||||
echo, verbose, concurrently);
|
echo, verbose, concurrently, 1);
|
||||||
}
|
|
||||||
}
|
|
||||||
if (tables.head != NULL)
|
if (tables.head != NULL)
|
||||||
{
|
reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
|
||||||
SimpleStringListCell *cell;
|
|
||||||
|
|
||||||
for (cell = tables.head; cell; cell = cell->next)
|
|
||||||
{
|
|
||||||
reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
|
|
||||||
port, username, prompt_password, progname,
|
port, username, prompt_password, progname,
|
||||||
echo, verbose, concurrently);
|
echo, verbose, concurrently,
|
||||||
}
|
concurrentCons);
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* reindex database only if neither index nor table nor schema is
|
* reindex database only if neither index nor table nor schema is
|
||||||
* specified
|
* specified
|
||||||
*/
|
*/
|
||||||
if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
|
if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
|
||||||
reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
|
reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
|
||||||
port, username, prompt_password, progname,
|
port, username, prompt_password, progname,
|
||||||
echo, verbose, concurrently);
|
echo, verbose, concurrently, concurrentCons);
|
||||||
}
|
}
|
||||||
|
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
reindex_one_database(const char *name, const char *dbname, ReindexType type,
|
reindex_one_database(const char *dbname, ReindexType type,
|
||||||
const char *host, const char *port, const char *username,
|
SimpleStringList *user_list, const char *host,
|
||||||
|
const char *port, const char *username,
|
||||||
enum trivalue prompt_password, const char *progname, bool echo,
|
enum trivalue prompt_password, const char *progname, bool echo,
|
||||||
bool verbose, bool concurrently)
|
bool verbose, bool concurrently, int concurrentCons)
|
||||||
{
|
{
|
||||||
PQExpBufferData sql;
|
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
|
SimpleStringListCell *cell;
|
||||||
|
bool parallel = concurrentCons > 1;
|
||||||
|
SimpleStringList *process_list = user_list;
|
||||||
|
ReindexType process_type = type;
|
||||||
|
ParallelSlot *slots;
|
||||||
|
bool failed = false;
|
||||||
|
int items_count = 0;
|
||||||
|
|
||||||
conn = connectDatabase(dbname, host, port, username, prompt_password,
|
conn = connectDatabase(dbname, host, port, username, prompt_password,
|
||||||
progname, echo, false, false);
|
progname, echo, false, false);
|
||||||
|
@ -308,6 +344,151 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!parallel)
|
||||||
|
{
|
||||||
|
switch (process_type)
|
||||||
|
{
|
||||||
|
case REINDEX_DATABASE:
|
||||||
|
case REINDEX_SYSTEM:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Database and system reindexes only need to work on the
|
||||||
|
* database itself, so build a list with a single entry.
|
||||||
|
*/
|
||||||
|
Assert(user_list == NULL);
|
||||||
|
process_list = pg_malloc0(sizeof(SimpleStringList));
|
||||||
|
simple_string_list_append(process_list, PQdb(conn));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_INDEX:
|
||||||
|
case REINDEX_SCHEMA:
|
||||||
|
case REINDEX_TABLE:
|
||||||
|
Assert(user_list != NULL);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
switch (process_type)
|
||||||
|
{
|
||||||
|
case REINDEX_DATABASE:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Database-wide parallel reindex requires special processing.
|
||||||
|
* If multiple jobs were asked, we have to reindex system
|
||||||
|
* catalogs first as they cannot be processed in parallel.
|
||||||
|
*/
|
||||||
|
if (concurrently)
|
||||||
|
pg_log_warning("cannot reindex system catalogs concurrently, skipping all");
|
||||||
|
else
|
||||||
|
run_reindex_command(conn, REINDEX_SYSTEM, PQdb(conn), echo,
|
||||||
|
verbose, concurrently, false);
|
||||||
|
|
||||||
|
/* Build a list of relations from the database */
|
||||||
|
process_list = get_parallel_object_list(conn, process_type,
|
||||||
|
user_list, echo);
|
||||||
|
process_type = REINDEX_TABLE;
|
||||||
|
|
||||||
|
/* Bail out if nothing to process */
|
||||||
|
if (process_list == NULL)
|
||||||
|
return;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_SCHEMA:
|
||||||
|
Assert(user_list != NULL);
|
||||||
|
|
||||||
|
/* Build a list of relations from all the schemas */
|
||||||
|
process_list = get_parallel_object_list(conn, process_type,
|
||||||
|
user_list, echo);
|
||||||
|
process_type = REINDEX_TABLE;
|
||||||
|
|
||||||
|
/* Bail out if nothing to process */
|
||||||
|
if (process_list == NULL)
|
||||||
|
return;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_SYSTEM:
|
||||||
|
case REINDEX_INDEX:
|
||||||
|
/* not supported */
|
||||||
|
Assert(false);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_TABLE:
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fall through. The list of items for tables is already
|
||||||
|
* created.
|
||||||
|
*/
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Adjust the number of concurrent connections depending on the items in
|
||||||
|
* the list. We choose the minimum between the number of concurrent
|
||||||
|
* connections and the number of items in the list.
|
||||||
|
*/
|
||||||
|
for (cell = process_list->head; cell; cell = cell->next)
|
||||||
|
{
|
||||||
|
items_count++;
|
||||||
|
|
||||||
|
/* no need to continue if there are more elements than jobs */
|
||||||
|
if (items_count >= concurrentCons)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
concurrentCons = Min(concurrentCons, items_count);
|
||||||
|
Assert(concurrentCons > 0);
|
||||||
|
|
||||||
|
Assert(process_list != NULL);
|
||||||
|
|
||||||
|
slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
|
||||||
|
progname, echo, conn, concurrentCons);
|
||||||
|
|
||||||
|
cell = process_list->head;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
const char *objname = cell->val;
|
||||||
|
ParallelSlot *free_slot = NULL;
|
||||||
|
|
||||||
|
if (CancelRequested)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
goto finish;
|
||||||
|
}
|
||||||
|
|
||||||
|
free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
|
||||||
|
if (!free_slot)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
goto finish;
|
||||||
|
}
|
||||||
|
|
||||||
|
run_reindex_command(free_slot->connection, process_type, objname,
|
||||||
|
echo, verbose, concurrently, true);
|
||||||
|
|
||||||
|
cell = cell->next;
|
||||||
|
} while (cell != NULL);
|
||||||
|
|
||||||
|
if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
|
||||||
|
failed = true;
|
||||||
|
|
||||||
|
finish:
|
||||||
|
ParallelSlotsTerminate(slots, concurrentCons);
|
||||||
|
pfree(slots);
|
||||||
|
|
||||||
|
if (failed)
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
|
||||||
|
bool echo, bool verbose, bool concurrently, bool async)
|
||||||
|
{
|
||||||
|
PQExpBufferData sql;
|
||||||
|
bool status;
|
||||||
|
|
||||||
|
Assert(name);
|
||||||
|
|
||||||
/* build the REINDEX query */
|
/* build the REINDEX query */
|
||||||
initPQExpBuffer(&sql);
|
initPQExpBuffer(&sql);
|
||||||
|
|
||||||
|
@ -344,7 +525,7 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
|
||||||
{
|
{
|
||||||
case REINDEX_DATABASE:
|
case REINDEX_DATABASE:
|
||||||
case REINDEX_SYSTEM:
|
case REINDEX_SYSTEM:
|
||||||
appendPQExpBufferStr(&sql, fmtId(PQdb(conn)));
|
appendPQExpBufferStr(&sql, fmtId(name));
|
||||||
break;
|
break;
|
||||||
case REINDEX_INDEX:
|
case REINDEX_INDEX:
|
||||||
case REINDEX_TABLE:
|
case REINDEX_TABLE:
|
||||||
|
@ -358,7 +539,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
|
||||||
/* finish the query */
|
/* finish the query */
|
||||||
appendPQExpBufferChar(&sql, ';');
|
appendPQExpBufferChar(&sql, ';');
|
||||||
|
|
||||||
if (!executeMaintenanceCommand(conn, sql.data, echo))
|
if (async)
|
||||||
|
{
|
||||||
|
if (echo)
|
||||||
|
printf("%s\n", sql.data);
|
||||||
|
|
||||||
|
status = PQsendQuery(conn, sql.data) == 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
status = executeMaintenanceCommand(conn, sql.data, echo);
|
||||||
|
|
||||||
|
if (!status)
|
||||||
{
|
{
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
|
@ -383,20 +574,141 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
|
||||||
name, PQdb(conn), PQerrorMessage(conn));
|
name, PQdb(conn), PQerrorMessage(conn));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (!async)
|
||||||
|
{
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
PQfinish(conn);
|
|
||||||
termPQExpBuffer(&sql);
|
termPQExpBuffer(&sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare the list of objects to process by querying the catalogs.
|
||||||
|
*
|
||||||
|
* This function will return a SimpleStringList object containing the entire
|
||||||
|
* list of tables in the given database that should be processed by a parallel
|
||||||
|
* database-wide reindex (excluding system tables), or NULL if there's no such
|
||||||
|
* table.
|
||||||
|
*/
|
||||||
|
static SimpleStringList *
|
||||||
|
get_parallel_object_list(PGconn *conn, ReindexType type,
|
||||||
|
SimpleStringList *user_list, bool echo)
|
||||||
|
{
|
||||||
|
PQExpBufferData catalog_query;
|
||||||
|
PQExpBufferData buf;
|
||||||
|
PGresult *res;
|
||||||
|
SimpleStringList *tables;
|
||||||
|
int ntups,
|
||||||
|
i;
|
||||||
|
|
||||||
|
initPQExpBuffer(&catalog_query);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The queries here are using a safe search_path, so there's no need to
|
||||||
|
* fully qualify everything.
|
||||||
|
*/
|
||||||
|
switch (type)
|
||||||
|
{
|
||||||
|
case REINDEX_DATABASE:
|
||||||
|
Assert(user_list == NULL);
|
||||||
|
appendPQExpBuffer(&catalog_query,
|
||||||
|
"SELECT c.relname, ns.nspname\n"
|
||||||
|
" FROM pg_catalog.pg_class c\n"
|
||||||
|
" JOIN pg_catalog.pg_namespace ns"
|
||||||
|
" ON c.relnamespace = ns.oid\n"
|
||||||
|
" WHERE ns.nspname != 'pg_catalog'\n"
|
||||||
|
" AND c.relkind IN ("
|
||||||
|
CppAsString2(RELKIND_RELATION) ", "
|
||||||
|
CppAsString2(RELKIND_MATVIEW) ")\n"
|
||||||
|
" ORDER BY c.relpages DESC;");
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_SCHEMA:
|
||||||
|
{
|
||||||
|
SimpleStringListCell *cell;
|
||||||
|
bool nsp_listed = false;
|
||||||
|
|
||||||
|
Assert(user_list != NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* All the tables from all the listed schemas are grabbed at
|
||||||
|
* once.
|
||||||
|
*/
|
||||||
|
appendPQExpBuffer(&catalog_query,
|
||||||
|
"SELECT c.relname, ns.nspname\n"
|
||||||
|
" FROM pg_catalog.pg_class c\n"
|
||||||
|
" JOIN pg_catalog.pg_namespace ns"
|
||||||
|
" ON c.relnamespace = ns.oid\n"
|
||||||
|
" WHERE c.relkind IN ("
|
||||||
|
CppAsString2(RELKIND_RELATION) ", "
|
||||||
|
CppAsString2(RELKIND_MATVIEW) ")\n"
|
||||||
|
" AND ns.nspname IN (");
|
||||||
|
|
||||||
|
for (cell = user_list->head; cell; cell = cell->next)
|
||||||
|
{
|
||||||
|
const char *nspname = cell->val;
|
||||||
|
|
||||||
|
if (nsp_listed)
|
||||||
|
appendPQExpBuffer(&catalog_query, ", ");
|
||||||
|
else
|
||||||
|
nsp_listed = true;
|
||||||
|
|
||||||
|
appendStringLiteralConn(&catalog_query, nspname, conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
appendPQExpBuffer(&catalog_query, ")\n"
|
||||||
|
" ORDER BY c.relpages DESC;");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REINDEX_SYSTEM:
|
||||||
|
case REINDEX_INDEX:
|
||||||
|
case REINDEX_TABLE:
|
||||||
|
Assert(false);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
res = executeQuery(conn, catalog_query.data, echo);
|
||||||
|
termPQExpBuffer(&catalog_query);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If no rows are returned, there are no matching tables, so we are done.
|
||||||
|
*/
|
||||||
|
ntups = PQntuples(res);
|
||||||
|
if (ntups == 0)
|
||||||
|
{
|
||||||
|
PQclear(res);
|
||||||
|
PQfinish(conn);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tables = pg_malloc0(sizeof(SimpleStringList));
|
||||||
|
|
||||||
|
/* Build qualified identifiers for each table */
|
||||||
|
initPQExpBuffer(&buf);
|
||||||
|
for (i = 0; i < ntups; i++)
|
||||||
|
{
|
||||||
|
appendPQExpBufferStr(&buf,
|
||||||
|
fmtQualifiedId(PQgetvalue(res, i, 1),
|
||||||
|
PQgetvalue(res, i, 0)));
|
||||||
|
|
||||||
|
simple_string_list_append(tables, buf.data);
|
||||||
|
resetPQExpBuffer(&buf);
|
||||||
|
}
|
||||||
|
termPQExpBuffer(&buf);
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
return tables;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
reindex_all_databases(const char *maintenance_db,
|
reindex_all_databases(const char *maintenance_db,
|
||||||
const char *host, const char *port,
|
const char *host, const char *port,
|
||||||
const char *username, enum trivalue prompt_password,
|
const char *username, enum trivalue prompt_password,
|
||||||
const char *progname, bool echo, bool quiet, bool verbose,
|
const char *progname, bool echo, bool quiet, bool verbose,
|
||||||
bool concurrently)
|
bool concurrently, int concurrentCons)
|
||||||
{
|
{
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
PGresult *result;
|
PGresult *result;
|
||||||
|
@ -423,9 +735,10 @@ reindex_all_databases(const char *maintenance_db,
|
||||||
appendPQExpBufferStr(&connstr, "dbname=");
|
appendPQExpBufferStr(&connstr, "dbname=");
|
||||||
appendConnStrVal(&connstr, dbname);
|
appendConnStrVal(&connstr, dbname);
|
||||||
|
|
||||||
reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
|
reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
|
||||||
port, username, prompt_password,
|
port, username, prompt_password,
|
||||||
progname, echo, verbose, concurrently);
|
progname, echo, verbose, concurrently,
|
||||||
|
concurrentCons);
|
||||||
}
|
}
|
||||||
termPQExpBuffer(&connstr);
|
termPQExpBuffer(&connstr);
|
||||||
|
|
||||||
|
@ -444,6 +757,7 @@ help(const char *progname)
|
||||||
printf(_(" -d, --dbname=DBNAME database to reindex\n"));
|
printf(_(" -d, --dbname=DBNAME database to reindex\n"));
|
||||||
printf(_(" -e, --echo show the commands being sent to the server\n"));
|
printf(_(" -e, --echo show the commands being sent to the server\n"));
|
||||||
printf(_(" -i, --index=INDEX recreate specific index(es) only\n"));
|
printf(_(" -i, --index=INDEX recreate specific index(es) only\n"));
|
||||||
|
printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n"));
|
||||||
printf(_(" -q, --quiet don't write any messages\n"));
|
printf(_(" -q, --quiet don't write any messages\n"));
|
||||||
printf(_(" -s, --system reindex system catalogs\n"));
|
printf(_(" -s, --system reindex system catalogs\n"));
|
||||||
printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n"));
|
printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n"));
|
||||||
|
|
|
@ -3,7 +3,7 @@ use warnings;
|
||||||
|
|
||||||
use PostgresNode;
|
use PostgresNode;
|
||||||
use TestLib;
|
use TestLib;
|
||||||
use Test::More tests => 34;
|
use Test::More tests => 44;
|
||||||
|
|
||||||
program_help_ok('reindexdb');
|
program_help_ok('reindexdb');
|
||||||
program_version_ok('reindexdb');
|
program_version_ok('reindexdb');
|
||||||
|
@ -77,3 +77,45 @@ $node->command_ok(
|
||||||
$node->command_ok(
|
$node->command_ok(
|
||||||
[qw(reindexdb --echo --system dbname=template1)],
|
[qw(reindexdb --echo --system dbname=template1)],
|
||||||
'reindexdb system with connection string');
|
'reindexdb system with connection string');
|
||||||
|
|
||||||
|
# parallel processing
|
||||||
|
$node->safe_psql(
|
||||||
|
'postgres', q|
|
||||||
|
CREATE SCHEMA s1;
|
||||||
|
CREATE TABLE s1.t1(id integer);
|
||||||
|
CREATE INDEX ON s1.t1(id);
|
||||||
|
CREATE SCHEMA s2;
|
||||||
|
CREATE TABLE s2.t2(id integer);
|
||||||
|
CREATE INDEX ON s2.t2(id);
|
||||||
|
-- empty schema
|
||||||
|
CREATE SCHEMA s3;
|
||||||
|
|);
|
||||||
|
|
||||||
|
$node->command_fails(
|
||||||
|
[ 'reindexdb', '-j', '2', '-s', 'postgres' ],
|
||||||
|
'parallel reindexdb cannot process system catalogs');
|
||||||
|
$node->command_fails(
|
||||||
|
[ 'reindexdb', '-j', '2', '-i', 'i1', 'postgres' ],
|
||||||
|
'parallel reindexdb cannot process indexes');
|
||||||
|
$node->issues_sql_like(
|
||||||
|
[ 'reindexdb', '-j', '2', 'postgres' ],
|
||||||
|
qr/statement:\ REINDEX SYSTEM postgres;
|
||||||
|
.*statement:\ REINDEX TABLE public\.test1/s,
|
||||||
|
'parallel reindexdb for database issues REINDEX SYSTEM first');
|
||||||
|
# Note that the ordering of the commands is not stable, so the second
|
||||||
|
# command for s2.t2 is not checked after.
|
||||||
|
$node->issues_sql_like(
|
||||||
|
[ 'reindexdb', '-j', '2', '-S', 's1', '-S', 's2', 'postgres' ],
|
||||||
|
qr/statement:\ REINDEX TABLE s1.t1;/,
|
||||||
|
'parallel reindexdb for schemas does a per-table REINDEX');
|
||||||
|
$node->command_ok(
|
||||||
|
['reindexdb', '-j', '2', '-S', 's3'],
|
||||||
|
'parallel reindexdb with empty schema');
|
||||||
|
$node->command_checks_all(
|
||||||
|
[ 'reindexdb', '-j', '2', '--concurrently', '-d', 'postgres' ],
|
||||||
|
0,
|
||||||
|
[qr/^$/],
|
||||||
|
[
|
||||||
|
qr/^reindexdb: warning: cannot reindex system catalogs concurrently, skipping all/s
|
||||||
|
],
|
||||||
|
'parallel reindexdb for system with --concurrently skips catalogs');
|
||||||
|
|
Loading…
Reference in New Issue