From 5ab892c391c6bc54a00e7a8de5cab077cabace6a Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Sat, 27 Jul 2019 22:21:18 +0900 Subject: [PATCH] Add support for --jobs in reindexdb When doing a schema-level or a database-level operation, a list of relations to build is created which gets processed in parallel using multiple connections, based on the recent refactoring for parallel slots in src/bin/scripts/. System catalogs are processed first in a serialized fashion to prevent deadlocks, followed by the rest done in parallel. This new option is not compatible with --system as reindexing system catalogs in parallel can lead to deadlocks, and with --index as there is no conflict handling for indexes rebuilt in parallel depending in the same relation. Author: Julien Rouhaud Reviewed-by: Sergei Kornilov, Michael Paquier Discussion: https://postgr.es/m/CAOBaU_YrnH_Jqo46NhaJ7uRBiWWEcS40VNRQxgFbqYo9kApUsg@mail.gmail.com --- doc/src/sgml/ref/reindexdb.sgml | 23 ++ src/bin/scripts/Makefile | 2 +- src/bin/scripts/reindexdb.c | 416 +++++++++++++++++++++++++---- src/bin/scripts/t/090_reindexdb.pl | 44 ++- 4 files changed, 432 insertions(+), 53 deletions(-) diff --git a/doc/src/sgml/ref/reindexdb.sgml b/doc/src/sgml/ref/reindexdb.sgml index 25b5a72770..5e21fbcc4e 100644 --- a/doc/src/sgml/ref/reindexdb.sgml +++ b/doc/src/sgml/ref/reindexdb.sgml @@ -166,6 +166,29 @@ PostgreSQL documentation + + + + + + Execute the reindex commands in parallel by running + njobs + commands simultaneously. This option reduces the time of the + processing but it also increases the load on the database server. + + + reindexdb will open + njobs connections to the + database, so make sure your + setting is high enough to accommodate all connections. + + + Note that this option is incompatible with the + and options. + + + + diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile index 3cd793b134..ede665090f 100644 --- a/src/bin/scripts/Makefile +++ b/src/bin/scripts/Makefile @@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake- dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils -reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils +reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils install: all installdirs diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index 219a9a9211..b2c0400cb9 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -10,10 +10,14 @@ */ #include "postgres_fe.h" + +#include "catalog/pg_class_d.h" #include "common.h" #include "common/logging.h" +#include "fe_utils/connect.h" #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" +#include "scripts_parallel.h" typedef enum ReindexType { @@ -25,16 +29,26 @@ typedef enum ReindexType } ReindexType; -static void reindex_one_database(const char *name, const char *dbname, - ReindexType type, const char *host, +static SimpleStringList *get_parallel_object_list(PGconn *conn, + ReindexType type, + SimpleStringList *user_list, + bool echo); +static void reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, - bool echo, bool verbose, bool concurrently); + bool echo, bool verbose, bool concurrently, + int concurrentCons); static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool quiet, bool verbose, bool concurrently); + bool quiet, bool verbose, bool concurrently, + int concurrentCons); +static void run_reindex_command(PGconn *conn, ReindexType type, + const char *name, bool echo, bool verbose, + bool concurrently, bool async); + static void help(const char *progname); int @@ -54,6 +68,7 @@ main(int argc, char *argv[]) {"system", no_argument, NULL, 's'}, {"table", required_argument, NULL, 't'}, {"index", required_argument, NULL, 'i'}, + {"jobs", required_argument, NULL, 'j'}, {"verbose", no_argument, NULL, 'v'}, {"concurrently", no_argument, NULL, 1}, {"maintenance-db", required_argument, NULL, 2}, @@ -79,6 +94,7 @@ main(int argc, char *argv[]) SimpleStringList indexes = {NULL, NULL}; SimpleStringList tables = {NULL, NULL}; SimpleStringList schemas = {NULL, NULL}; + int concurrentCons = 1; pg_logging_init(argv[0]); progname = get_progname(argv[0]); @@ -87,7 +103,7 @@ main(int argc, char *argv[]) handle_help_version_opts(argc, argv, "reindexdb", help); /* process command-line options */ - while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1) { switch (c) { @@ -130,6 +146,20 @@ main(int argc, char *argv[]) case 'i': simple_string_list_append(&indexes, optarg); break; + case 'j': + concurrentCons = atoi(optarg); + if (concurrentCons <= 0) + { + pg_log_error("number of parallel jobs must be at least 1"); + exit(1); + } + if (concurrentCons > FD_SETSIZE - 1) + { + pg_log_error("too many parallel jobs requested (maximum: %d)", + FD_SETSIZE - 1); + exit(1); + } + break; case 'v': verbose = true; break; @@ -194,7 +224,8 @@ main(int argc, char *argv[]) } reindex_all_databases(maintenance_db, host, port, username, - prompt_password, progname, echo, quiet, verbose, concurrently); + prompt_password, progname, echo, quiet, verbose, + concurrently, concurrentCons); } else if (syscatalog) { @@ -214,6 +245,12 @@ main(int argc, char *argv[]) exit(1); } + if (concurrentCons > 1) + { + pg_log_error("cannot use multiple jobs to reindex system catalogs"); + exit(1); + } + if (dbname == NULL) { if (getenv("PGDATABASE")) @@ -224,12 +261,23 @@ main(int argc, char *argv[]) dbname = get_user_name_or_exit(progname); } - reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host, + reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, 1); } else { + /* + * Index-level REINDEX is not supported with multiple jobs as we + * cannot control the concurrent processing of multiple indexes + * depending on the same relation. + */ + if (concurrentCons > 1 && indexes.head != NULL) + { + pg_log_error("cannot use multiple jobs to reindex indexes"); + exit(1); + } + if (dbname == NULL) { if (getenv("PGDATABASE")) @@ -241,61 +289,49 @@ main(int argc, char *argv[]) } if (schemas.head != NULL) - { - SimpleStringListCell *cell; - - for (cell = schemas.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, concurrentCons); if (indexes.head != NULL) - { - SimpleStringListCell *cell; + reindex_one_database(dbname, REINDEX_INDEX, &indexes, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, 1); - for (cell = indexes.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_INDEX, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } if (tables.head != NULL) - { - SimpleStringListCell *cell; - - for (cell = tables.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_TABLE, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + reindex_one_database(dbname, REINDEX_TABLE, &tables, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + concurrentCons); /* * reindex database only if neither index nor table nor schema is * specified */ if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL) - reindex_one_database(NULL, dbname, REINDEX_DATABASE, host, + reindex_one_database(dbname, REINDEX_DATABASE, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, concurrentCons); } exit(0); } static void -reindex_one_database(const char *name, const char *dbname, ReindexType type, - const char *host, const char *port, const char *username, +reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, + const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool verbose, bool concurrently) + bool verbose, bool concurrently, int concurrentCons) { - PQExpBufferData sql; PGconn *conn; + SimpleStringListCell *cell; + bool parallel = concurrentCons > 1; + SimpleStringList *process_list = user_list; + ReindexType process_type = type; + ParallelSlot *slots; + bool failed = false; + int items_count = 0; conn = connectDatabase(dbname, host, port, username, prompt_password, progname, echo, false, false); @@ -308,6 +344,151 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, exit(1); } + if (!parallel) + { + switch (process_type) + { + case REINDEX_DATABASE: + case REINDEX_SYSTEM: + + /* + * Database and system reindexes only need to work on the + * database itself, so build a list with a single entry. + */ + Assert(user_list == NULL); + process_list = pg_malloc0(sizeof(SimpleStringList)); + simple_string_list_append(process_list, PQdb(conn)); + break; + + case REINDEX_INDEX: + case REINDEX_SCHEMA: + case REINDEX_TABLE: + Assert(user_list != NULL); + break; + } + } + else + { + switch (process_type) + { + case REINDEX_DATABASE: + + /* + * Database-wide parallel reindex requires special processing. + * If multiple jobs were asked, we have to reindex system + * catalogs first as they cannot be processed in parallel. + */ + if (concurrently) + pg_log_warning("cannot reindex system catalogs concurrently, skipping all"); + else + run_reindex_command(conn, REINDEX_SYSTEM, PQdb(conn), echo, + verbose, concurrently, false); + + /* Build a list of relations from the database */ + process_list = get_parallel_object_list(conn, process_type, + user_list, echo); + process_type = REINDEX_TABLE; + + /* Bail out if nothing to process */ + if (process_list == NULL) + return; + break; + + case REINDEX_SCHEMA: + Assert(user_list != NULL); + + /* Build a list of relations from all the schemas */ + process_list = get_parallel_object_list(conn, process_type, + user_list, echo); + process_type = REINDEX_TABLE; + + /* Bail out if nothing to process */ + if (process_list == NULL) + return; + break; + + case REINDEX_SYSTEM: + case REINDEX_INDEX: + /* not supported */ + Assert(false); + break; + + case REINDEX_TABLE: + + /* + * Fall through. The list of items for tables is already + * created. + */ + break; + } + } + + /* + * Adjust the number of concurrent connections depending on the items in + * the list. We choose the minimum between the number of concurrent + * connections and the number of items in the list. + */ + for (cell = process_list->head; cell; cell = cell->next) + { + items_count++; + + /* no need to continue if there are more elements than jobs */ + if (items_count >= concurrentCons) + break; + } + concurrentCons = Min(concurrentCons, items_count); + Assert(concurrentCons > 0); + + Assert(process_list != NULL); + + slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password, + progname, echo, conn, concurrentCons); + + cell = process_list->head; + do + { + const char *objname = cell->val; + ParallelSlot *free_slot = NULL; + + if (CancelRequested) + { + failed = true; + goto finish; + } + + free_slot = ParallelSlotsGetIdle(slots, concurrentCons); + if (!free_slot) + { + failed = true; + goto finish; + } + + run_reindex_command(free_slot->connection, process_type, objname, + echo, verbose, concurrently, true); + + cell = cell->next; + } while (cell != NULL); + + if (!ParallelSlotsWaitCompletion(slots, concurrentCons)) + failed = true; + +finish: + ParallelSlotsTerminate(slots, concurrentCons); + pfree(slots); + + if (failed) + exit(1); +} + +static void +run_reindex_command(PGconn *conn, ReindexType type, const char *name, + bool echo, bool verbose, bool concurrently, bool async) +{ + PQExpBufferData sql; + bool status; + + Assert(name); + /* build the REINDEX query */ initPQExpBuffer(&sql); @@ -344,7 +525,7 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, { case REINDEX_DATABASE: case REINDEX_SYSTEM: - appendPQExpBufferStr(&sql, fmtId(PQdb(conn))); + appendPQExpBufferStr(&sql, fmtId(name)); break; case REINDEX_INDEX: case REINDEX_TABLE: @@ -358,7 +539,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, /* finish the query */ appendPQExpBufferChar(&sql, ';'); - if (!executeMaintenanceCommand(conn, sql.data, echo)) + if (async) + { + if (echo) + printf("%s\n", sql.data); + + status = PQsendQuery(conn, sql.data) == 1; + } + else + status = executeMaintenanceCommand(conn, sql.data, echo); + + if (!status) { switch (type) { @@ -383,20 +574,141 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, name, PQdb(conn), PQerrorMessage(conn)); break; } - PQfinish(conn); - exit(1); + if (!async) + { + PQfinish(conn); + exit(1); + } } - PQfinish(conn); termPQExpBuffer(&sql); } +/* + * Prepare the list of objects to process by querying the catalogs. + * + * This function will return a SimpleStringList object containing the entire + * list of tables in the given database that should be processed by a parallel + * database-wide reindex (excluding system tables), or NULL if there's no such + * table. + */ +static SimpleStringList * +get_parallel_object_list(PGconn *conn, ReindexType type, + SimpleStringList *user_list, bool echo) +{ + PQExpBufferData catalog_query; + PQExpBufferData buf; + PGresult *res; + SimpleStringList *tables; + int ntups, + i; + + initPQExpBuffer(&catalog_query); + + /* + * The queries here are using a safe search_path, so there's no need to + * fully qualify everything. + */ + switch (type) + { + case REINDEX_DATABASE: + Assert(user_list == NULL); + appendPQExpBuffer(&catalog_query, + "SELECT c.relname, ns.nspname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_namespace ns" + " ON c.relnamespace = ns.oid\n" + " WHERE ns.nspname != 'pg_catalog'\n" + " AND c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ")\n" + " ORDER BY c.relpages DESC;"); + break; + + case REINDEX_SCHEMA: + { + SimpleStringListCell *cell; + bool nsp_listed = false; + + Assert(user_list != NULL); + + /* + * All the tables from all the listed schemas are grabbed at + * once. + */ + appendPQExpBuffer(&catalog_query, + "SELECT c.relname, ns.nspname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_namespace ns" + " ON c.relnamespace = ns.oid\n" + " WHERE c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ")\n" + " AND ns.nspname IN ("); + + for (cell = user_list->head; cell; cell = cell->next) + { + const char *nspname = cell->val; + + if (nsp_listed) + appendPQExpBuffer(&catalog_query, ", "); + else + nsp_listed = true; + + appendStringLiteralConn(&catalog_query, nspname, conn); + } + + appendPQExpBuffer(&catalog_query, ")\n" + " ORDER BY c.relpages DESC;"); + } + break; + + case REINDEX_SYSTEM: + case REINDEX_INDEX: + case REINDEX_TABLE: + Assert(false); + break; + } + + res = executeQuery(conn, catalog_query.data, echo); + termPQExpBuffer(&catalog_query); + + /* + * If no rows are returned, there are no matching tables, so we are done. + */ + ntups = PQntuples(res); + if (ntups == 0) + { + PQclear(res); + PQfinish(conn); + return NULL; + } + + tables = pg_malloc0(sizeof(SimpleStringList)); + + /* Build qualified identifiers for each table */ + initPQExpBuffer(&buf); + for (i = 0; i < ntups; i++) + { + appendPQExpBufferStr(&buf, + fmtQualifiedId(PQgetvalue(res, i, 1), + PQgetvalue(res, i, 0))); + + simple_string_list_append(tables, buf.data); + resetPQExpBuffer(&buf); + } + termPQExpBuffer(&buf); + PQclear(res); + + return tables; +} + static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, bool quiet, bool verbose, - bool concurrently) + bool concurrently, int concurrentCons) { PGconn *conn; PGresult *result; @@ -423,9 +735,10 @@ reindex_all_databases(const char *maintenance_db, appendPQExpBufferStr(&connstr, "dbname="); appendConnStrVal(&connstr, dbname); - reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host, + reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host, port, username, prompt_password, - progname, echo, verbose, concurrently); + progname, echo, verbose, concurrently, + concurrentCons); } termPQExpBuffer(&connstr); @@ -444,6 +757,7 @@ help(const char *progname) printf(_(" -d, --dbname=DBNAME database to reindex\n")); printf(_(" -e, --echo show the commands being sent to the server\n")); printf(_(" -i, --index=INDEX recreate specific index(es) only\n")); + printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n")); printf(_(" -q, --quiet don't write any messages\n")); printf(_(" -s, --system reindex system catalogs\n")); printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n")); diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl index 1af8ab70ad..c20ffbd505 100644 --- a/src/bin/scripts/t/090_reindexdb.pl +++ b/src/bin/scripts/t/090_reindexdb.pl @@ -3,7 +3,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 34; +use Test::More tests => 44; program_help_ok('reindexdb'); program_version_ok('reindexdb'); @@ -77,3 +77,45 @@ $node->command_ok( $node->command_ok( [qw(reindexdb --echo --system dbname=template1)], 'reindexdb system with connection string'); + +# parallel processing +$node->safe_psql( + 'postgres', q| + CREATE SCHEMA s1; + CREATE TABLE s1.t1(id integer); + CREATE INDEX ON s1.t1(id); + CREATE SCHEMA s2; + CREATE TABLE s2.t2(id integer); + CREATE INDEX ON s2.t2(id); + -- empty schema + CREATE SCHEMA s3; +|); + +$node->command_fails( + [ 'reindexdb', '-j', '2', '-s', 'postgres' ], + 'parallel reindexdb cannot process system catalogs'); +$node->command_fails( + [ 'reindexdb', '-j', '2', '-i', 'i1', 'postgres' ], + 'parallel reindexdb cannot process indexes'); +$node->issues_sql_like( + [ 'reindexdb', '-j', '2', 'postgres' ], + qr/statement:\ REINDEX SYSTEM postgres; +.*statement:\ REINDEX TABLE public\.test1/s, + 'parallel reindexdb for database issues REINDEX SYSTEM first'); +# Note that the ordering of the commands is not stable, so the second +# command for s2.t2 is not checked after. +$node->issues_sql_like( + [ 'reindexdb', '-j', '2', '-S', 's1', '-S', 's2', 'postgres' ], + qr/statement:\ REINDEX TABLE s1.t1;/, + 'parallel reindexdb for schemas does a per-table REINDEX'); +$node->command_ok( + ['reindexdb', '-j', '2', '-S', 's3'], + 'parallel reindexdb with empty schema'); +$node->command_checks_all( + [ 'reindexdb', '-j', '2', '--concurrently', '-d', 'postgres' ], + 0, + [qr/^$/], + [ + qr/^reindexdb: warning: cannot reindex system catalogs concurrently, skipping all/s + ], + 'parallel reindexdb for system with --concurrently skips catalogs');