Add support for --jobs in reindexdb

When doing a schema-level or a database-level operation, a list of
relations to build is created which gets processed in parallel using
multiple connections, based on the recent refactoring for parallel slots
in src/bin/scripts/.  System catalogs are processed first in a
serialized fashion to prevent deadlocks, followed by the rest done in
parallel.

This new option is not compatible with --system as reindexing system
catalogs in parallel can lead to deadlocks, and with --index as there is
no conflict handling for indexes rebuilt in parallel depending in the
same relation.

Author: Julien Rouhaud
Reviewed-by: Sergei Kornilov, Michael Paquier
Discussion: https://postgr.es/m/CAOBaU_YrnH_Jqo46NhaJ7uRBiWWEcS40VNRQxgFbqYo9kApUsg@mail.gmail.com
This commit is contained in:
Michael Paquier 2019-07-27 22:21:18 +09:00
parent 4552c0f566
commit 5ab892c391
4 changed files with 432 additions and 53 deletions

View File

@ -166,6 +166,29 @@ PostgreSQL documentation
</listitem>
</varlistentry>
<varlistentry>
<term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
<term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
<listitem>
<para>
Execute the reindex commands in parallel by running
<replaceable class="parameter">njobs</replaceable>
commands simultaneously. This option reduces the time of the
processing but it also increases the load on the database server.
</para>
<para>
<application>reindexdb</application> will open
<replaceable class="parameter">njobs</replaceable> connections to the
database, so make sure your <xref linkend="guc-max-connections"/>
setting is high enough to accommodate all connections.
</para>
<para>
Note that this option is incompatible with the <option>--index</option>
and <option>--system</option> options.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-q</option></term>
<term><option>--quiet</option></term>

View File

@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
install: all installdirs

View File

@ -10,10 +10,14 @@
*/
#include "postgres_fe.h"
#include "catalog/pg_class_d.h"
#include "common.h"
#include "common/logging.h"
#include "fe_utils/connect.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "scripts_parallel.h"
typedef enum ReindexType
{
@ -25,16 +29,26 @@ typedef enum ReindexType
} ReindexType;
static void reindex_one_database(const char *name, const char *dbname,
ReindexType type, const char *host,
static SimpleStringList *get_parallel_object_list(PGconn *conn,
ReindexType type,
SimpleStringList *user_list,
bool echo);
static void reindex_one_database(const char *dbname, ReindexType type,
SimpleStringList *user_list, const char *host,
const char *port, const char *username,
enum trivalue prompt_password, const char *progname,
bool echo, bool verbose, bool concurrently);
bool echo, bool verbose, bool concurrently,
int concurrentCons);
static void reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo,
bool quiet, bool verbose, bool concurrently);
bool quiet, bool verbose, bool concurrently,
int concurrentCons);
static void run_reindex_command(PGconn *conn, ReindexType type,
const char *name, bool echo, bool verbose,
bool concurrently, bool async);
static void help(const char *progname);
int
@ -54,6 +68,7 @@ main(int argc, char *argv[])
{"system", no_argument, NULL, 's'},
{"table", required_argument, NULL, 't'},
{"index", required_argument, NULL, 'i'},
{"jobs", required_argument, NULL, 'j'},
{"verbose", no_argument, NULL, 'v'},
{"concurrently", no_argument, NULL, 1},
{"maintenance-db", required_argument, NULL, 2},
@ -79,6 +94,7 @@ main(int argc, char *argv[])
SimpleStringList indexes = {NULL, NULL};
SimpleStringList tables = {NULL, NULL};
SimpleStringList schemas = {NULL, NULL};
int concurrentCons = 1;
pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
@ -87,7 +103,7 @@ main(int argc, char *argv[])
handle_help_version_opts(argc, argv, "reindexdb", help);
/* process command-line options */
while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
{
switch (c)
{
@ -130,6 +146,20 @@ main(int argc, char *argv[])
case 'i':
simple_string_list_append(&indexes, optarg);
break;
case 'j':
concurrentCons = atoi(optarg);
if (concurrentCons <= 0)
{
pg_log_error("number of parallel jobs must be at least 1");
exit(1);
}
if (concurrentCons > FD_SETSIZE - 1)
{
pg_log_error("too many parallel jobs requested (maximum: %d)",
FD_SETSIZE - 1);
exit(1);
}
break;
case 'v':
verbose = true;
break;
@ -194,7 +224,8 @@ main(int argc, char *argv[])
}
reindex_all_databases(maintenance_db, host, port, username,
prompt_password, progname, echo, quiet, verbose, concurrently);
prompt_password, progname, echo, quiet, verbose,
concurrently, concurrentCons);
}
else if (syscatalog)
{
@ -214,6 +245,12 @@ main(int argc, char *argv[])
exit(1);
}
if (concurrentCons > 1)
{
pg_log_error("cannot use multiple jobs to reindex system catalogs");
exit(1);
}
if (dbname == NULL)
{
if (getenv("PGDATABASE"))
@ -224,12 +261,23 @@ main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
port, username, prompt_password, progname,
echo, verbose, concurrently);
echo, verbose, concurrently, 1);
}
else
{
/*
* Index-level REINDEX is not supported with multiple jobs as we
* cannot control the concurrent processing of multiple indexes
* depending on the same relation.
*/
if (concurrentCons > 1 && indexes.head != NULL)
{
pg_log_error("cannot use multiple jobs to reindex indexes");
exit(1);
}
if (dbname == NULL)
{
if (getenv("PGDATABASE"))
@ -241,61 +289,49 @@ main(int argc, char *argv[])
}
if (schemas.head != NULL)
{
SimpleStringListCell *cell;
for (cell = schemas.head; cell; cell = cell->next)
{
reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
port, username, prompt_password, progname,
echo, verbose, concurrently);
}
}
reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
port, username, prompt_password, progname,
echo, verbose, concurrently, concurrentCons);
if (indexes.head != NULL)
{
SimpleStringListCell *cell;
reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
port, username, prompt_password, progname,
echo, verbose, concurrently, 1);
for (cell = indexes.head; cell; cell = cell->next)
{
reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
port, username, prompt_password, progname,
echo, verbose, concurrently);
}
}
if (tables.head != NULL)
{
SimpleStringListCell *cell;
for (cell = tables.head; cell; cell = cell->next)
{
reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
port, username, prompt_password, progname,
echo, verbose, concurrently);
}
}
reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
port, username, prompt_password, progname,
echo, verbose, concurrently,
concurrentCons);
/*
* reindex database only if neither index nor table nor schema is
* specified
*/
if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
port, username, prompt_password, progname,
echo, verbose, concurrently);
echo, verbose, concurrently, concurrentCons);
}
exit(0);
}
static void
reindex_one_database(const char *name, const char *dbname, ReindexType type,
const char *host, const char *port, const char *username,
reindex_one_database(const char *dbname, ReindexType type,
SimpleStringList *user_list, const char *host,
const char *port, const char *username,
enum trivalue prompt_password, const char *progname, bool echo,
bool verbose, bool concurrently)
bool verbose, bool concurrently, int concurrentCons)
{
PQExpBufferData sql;
PGconn *conn;
SimpleStringListCell *cell;
bool parallel = concurrentCons > 1;
SimpleStringList *process_list = user_list;
ReindexType process_type = type;
ParallelSlot *slots;
bool failed = false;
int items_count = 0;
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, echo, false, false);
@ -308,6 +344,151 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
exit(1);
}
if (!parallel)
{
switch (process_type)
{
case REINDEX_DATABASE:
case REINDEX_SYSTEM:
/*
* Database and system reindexes only need to work on the
* database itself, so build a list with a single entry.
*/
Assert(user_list == NULL);
process_list = pg_malloc0(sizeof(SimpleStringList));
simple_string_list_append(process_list, PQdb(conn));
break;
case REINDEX_INDEX:
case REINDEX_SCHEMA:
case REINDEX_TABLE:
Assert(user_list != NULL);
break;
}
}
else
{
switch (process_type)
{
case REINDEX_DATABASE:
/*
* Database-wide parallel reindex requires special processing.
* If multiple jobs were asked, we have to reindex system
* catalogs first as they cannot be processed in parallel.
*/
if (concurrently)
pg_log_warning("cannot reindex system catalogs concurrently, skipping all");
else
run_reindex_command(conn, REINDEX_SYSTEM, PQdb(conn), echo,
verbose, concurrently, false);
/* Build a list of relations from the database */
process_list = get_parallel_object_list(conn, process_type,
user_list, echo);
process_type = REINDEX_TABLE;
/* Bail out if nothing to process */
if (process_list == NULL)
return;
break;
case REINDEX_SCHEMA:
Assert(user_list != NULL);
/* Build a list of relations from all the schemas */
process_list = get_parallel_object_list(conn, process_type,
user_list, echo);
process_type = REINDEX_TABLE;
/* Bail out if nothing to process */
if (process_list == NULL)
return;
break;
case REINDEX_SYSTEM:
case REINDEX_INDEX:
/* not supported */
Assert(false);
break;
case REINDEX_TABLE:
/*
* Fall through. The list of items for tables is already
* created.
*/
break;
}
}
/*
* Adjust the number of concurrent connections depending on the items in
* the list. We choose the minimum between the number of concurrent
* connections and the number of items in the list.
*/
for (cell = process_list->head; cell; cell = cell->next)
{
items_count++;
/* no need to continue if there are more elements than jobs */
if (items_count >= concurrentCons)
break;
}
concurrentCons = Min(concurrentCons, items_count);
Assert(concurrentCons > 0);
Assert(process_list != NULL);
slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
progname, echo, conn, concurrentCons);
cell = process_list->head;
do
{
const char *objname = cell->val;
ParallelSlot *free_slot = NULL;
if (CancelRequested)
{
failed = true;
goto finish;
}
free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
if (!free_slot)
{
failed = true;
goto finish;
}
run_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, true);
cell = cell->next;
} while (cell != NULL);
if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
failed = true;
finish:
ParallelSlotsTerminate(slots, concurrentCons);
pfree(slots);
if (failed)
exit(1);
}
static void
run_reindex_command(PGconn *conn, ReindexType type, const char *name,
bool echo, bool verbose, bool concurrently, bool async)
{
PQExpBufferData sql;
bool status;
Assert(name);
/* build the REINDEX query */
initPQExpBuffer(&sql);
@ -344,7 +525,7 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
{
case REINDEX_DATABASE:
case REINDEX_SYSTEM:
appendPQExpBufferStr(&sql, fmtId(PQdb(conn)));
appendPQExpBufferStr(&sql, fmtId(name));
break;
case REINDEX_INDEX:
case REINDEX_TABLE:
@ -358,7 +539,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
/* finish the query */
appendPQExpBufferChar(&sql, ';');
if (!executeMaintenanceCommand(conn, sql.data, echo))
if (async)
{
if (echo)
printf("%s\n", sql.data);
status = PQsendQuery(conn, sql.data) == 1;
}
else
status = executeMaintenanceCommand(conn, sql.data, echo);
if (!status)
{
switch (type)
{
@ -383,20 +574,141 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
name, PQdb(conn), PQerrorMessage(conn));
break;
}
PQfinish(conn);
exit(1);
if (!async)
{
PQfinish(conn);
exit(1);
}
}
PQfinish(conn);
termPQExpBuffer(&sql);
}
/*
* Prepare the list of objects to process by querying the catalogs.
*
* This function will return a SimpleStringList object containing the entire
* list of tables in the given database that should be processed by a parallel
* database-wide reindex (excluding system tables), or NULL if there's no such
* table.
*/
static SimpleStringList *
get_parallel_object_list(PGconn *conn, ReindexType type,
SimpleStringList *user_list, bool echo)
{
PQExpBufferData catalog_query;
PQExpBufferData buf;
PGresult *res;
SimpleStringList *tables;
int ntups,
i;
initPQExpBuffer(&catalog_query);
/*
* The queries here are using a safe search_path, so there's no need to
* fully qualify everything.
*/
switch (type)
{
case REINDEX_DATABASE:
Assert(user_list == NULL);
appendPQExpBuffer(&catalog_query,
"SELECT c.relname, ns.nspname\n"
" FROM pg_catalog.pg_class c\n"
" JOIN pg_catalog.pg_namespace ns"
" ON c.relnamespace = ns.oid\n"
" WHERE ns.nspname != 'pg_catalog'\n"
" AND c.relkind IN ("
CppAsString2(RELKIND_RELATION) ", "
CppAsString2(RELKIND_MATVIEW) ")\n"
" ORDER BY c.relpages DESC;");
break;
case REINDEX_SCHEMA:
{
SimpleStringListCell *cell;
bool nsp_listed = false;
Assert(user_list != NULL);
/*
* All the tables from all the listed schemas are grabbed at
* once.
*/
appendPQExpBuffer(&catalog_query,
"SELECT c.relname, ns.nspname\n"
" FROM pg_catalog.pg_class c\n"
" JOIN pg_catalog.pg_namespace ns"
" ON c.relnamespace = ns.oid\n"
" WHERE c.relkind IN ("
CppAsString2(RELKIND_RELATION) ", "
CppAsString2(RELKIND_MATVIEW) ")\n"
" AND ns.nspname IN (");
for (cell = user_list->head; cell; cell = cell->next)
{
const char *nspname = cell->val;
if (nsp_listed)
appendPQExpBuffer(&catalog_query, ", ");
else
nsp_listed = true;
appendStringLiteralConn(&catalog_query, nspname, conn);
}
appendPQExpBuffer(&catalog_query, ")\n"
" ORDER BY c.relpages DESC;");
}
break;
case REINDEX_SYSTEM:
case REINDEX_INDEX:
case REINDEX_TABLE:
Assert(false);
break;
}
res = executeQuery(conn, catalog_query.data, echo);
termPQExpBuffer(&catalog_query);
/*
* If no rows are returned, there are no matching tables, so we are done.
*/
ntups = PQntuples(res);
if (ntups == 0)
{
PQclear(res);
PQfinish(conn);
return NULL;
}
tables = pg_malloc0(sizeof(SimpleStringList));
/* Build qualified identifiers for each table */
initPQExpBuffer(&buf);
for (i = 0; i < ntups; i++)
{
appendPQExpBufferStr(&buf,
fmtQualifiedId(PQgetvalue(res, i, 1),
PQgetvalue(res, i, 0)));
simple_string_list_append(tables, buf.data);
resetPQExpBuffer(&buf);
}
termPQExpBuffer(&buf);
PQclear(res);
return tables;
}
static void
reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo, bool quiet, bool verbose,
bool concurrently)
bool concurrently, int concurrentCons)
{
PGconn *conn;
PGresult *result;
@ -423,9 +735,10 @@ reindex_all_databases(const char *maintenance_db,
appendPQExpBufferStr(&connstr, "dbname=");
appendConnStrVal(&connstr, dbname);
reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
port, username, prompt_password,
progname, echo, verbose, concurrently);
progname, echo, verbose, concurrently,
concurrentCons);
}
termPQExpBuffer(&connstr);
@ -444,6 +757,7 @@ help(const char *progname)
printf(_(" -d, --dbname=DBNAME database to reindex\n"));
printf(_(" -e, --echo show the commands being sent to the server\n"));
printf(_(" -i, --index=INDEX recreate specific index(es) only\n"));
printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n"));
printf(_(" -q, --quiet don't write any messages\n"));
printf(_(" -s, --system reindex system catalogs\n"));
printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n"));

View File

@ -3,7 +3,7 @@ use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 34;
use Test::More tests => 44;
program_help_ok('reindexdb');
program_version_ok('reindexdb');
@ -77,3 +77,45 @@ $node->command_ok(
$node->command_ok(
[qw(reindexdb --echo --system dbname=template1)],
'reindexdb system with connection string');
# parallel processing
$node->safe_psql(
'postgres', q|
CREATE SCHEMA s1;
CREATE TABLE s1.t1(id integer);
CREATE INDEX ON s1.t1(id);
CREATE SCHEMA s2;
CREATE TABLE s2.t2(id integer);
CREATE INDEX ON s2.t2(id);
-- empty schema
CREATE SCHEMA s3;
|);
$node->command_fails(
[ 'reindexdb', '-j', '2', '-s', 'postgres' ],
'parallel reindexdb cannot process system catalogs');
$node->command_fails(
[ 'reindexdb', '-j', '2', '-i', 'i1', 'postgres' ],
'parallel reindexdb cannot process indexes');
$node->issues_sql_like(
[ 'reindexdb', '-j', '2', 'postgres' ],
qr/statement:\ REINDEX SYSTEM postgres;
.*statement:\ REINDEX TABLE public\.test1/s,
'parallel reindexdb for database issues REINDEX SYSTEM first');
# Note that the ordering of the commands is not stable, so the second
# command for s2.t2 is not checked after.
$node->issues_sql_like(
[ 'reindexdb', '-j', '2', '-S', 's1', '-S', 's2', 'postgres' ],
qr/statement:\ REINDEX TABLE s1.t1;/,
'parallel reindexdb for schemas does a per-table REINDEX');
$node->command_ok(
['reindexdb', '-j', '2', '-S', 's3'],
'parallel reindexdb with empty schema');
$node->command_checks_all(
[ 'reindexdb', '-j', '2', '--concurrently', '-d', 'postgres' ],
0,
[qr/^$/],
[
qr/^reindexdb: warning: cannot reindex system catalogs concurrently, skipping all/s
],
'parallel reindexdb for system with --concurrently skips catalogs');