diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index cf28176243..fc0681538a 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -36,7 +36,7 @@ static SimpleStringList *get_parallel_object_list(PGconn *conn, ReindexType type, SimpleStringList *user_list, bool echo); -static void reindex_one_database(const ConnParams *cparams, ReindexType type, +static void reindex_one_database(ConnParams *cparams, ReindexType type, SimpleStringList *user_list, const char *progname, bool echo, bool verbose, bool concurrently, @@ -330,7 +330,7 @@ main(int argc, char *argv[]) } static void -reindex_one_database(const ConnParams *cparams, ReindexType type, +reindex_one_database(ConnParams *cparams, ReindexType type, SimpleStringList *user_list, const char *progname, bool echo, bool verbose, bool concurrently, int concurrentCons, @@ -341,7 +341,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, bool parallel = concurrentCons > 1; SimpleStringList *process_list = user_list; ReindexType process_type = type; - ParallelSlot *slots; + ParallelSlotArray *sa; bool failed = false; int items_count = 0; @@ -461,7 +461,8 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, Assert(process_list != NULL); - slots = ParallelSlotsSetup(cparams, progname, echo, conn, concurrentCons); + sa = ParallelSlotsSetup(concurrentCons, cparams, progname, echo, NULL); + ParallelSlotsAdoptConn(sa, conn); cell = process_list->head; do @@ -475,7 +476,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, goto finish; } - free_slot = ParallelSlotsGetIdle(slots, concurrentCons); + free_slot = ParallelSlotsGetIdle(sa, NULL); if (!free_slot) { failed = true; @@ -489,7 +490,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type, cell = cell->next; } while (cell != NULL); - if (!ParallelSlotsWaitCompletion(slots, concurrentCons)) + if (!ParallelSlotsWaitCompletion(sa)) failed = true; finish: @@ -499,8 +500,8 @@ finish: pg_free(process_list); } - ParallelSlotsTerminate(slots, concurrentCons); - pfree(slots); + ParallelSlotsTerminate(sa); + pfree(sa); if (failed) exit(1); diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index 602fd45c42..7901c41f16 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -45,7 +45,7 @@ typedef struct vacuumingOptions } vacuumingOptions; -static void vacuum_one_database(const ConnParams *cparams, +static void vacuum_one_database(ConnParams *cparams, vacuumingOptions *vacopts, int stage, SimpleStringList *tables, @@ -408,7 +408,7 @@ main(int argc, char *argv[]) * a list of tables from the database. */ static void -vacuum_one_database(const ConnParams *cparams, +vacuum_one_database(ConnParams *cparams, vacuumingOptions *vacopts, int stage, SimpleStringList *tables, @@ -421,13 +421,14 @@ vacuum_one_database(const ConnParams *cparams, PGresult *res; PGconn *conn; SimpleStringListCell *cell; - ParallelSlot *slots; + ParallelSlotArray *sa; SimpleStringList dbtables = {NULL, NULL}; int i; int ntups; bool failed = false; bool tables_listed = false; bool has_where = false; + const char *initcmd; const char *stage_commands[] = { "SET default_statistics_target=1; SET vacuum_cost_delay=0;", "SET default_statistics_target=10; RESET vacuum_cost_delay;", @@ -683,27 +684,26 @@ vacuum_one_database(const ConnParams *cparams, if (concurrentCons <= 0) concurrentCons = 1; + /* + * All slots need to be prepared to run the appropriate analyze stage, if + * caller requested that mode. We have to prepare the initial connection + * ourselves before setting up the slots. + */ + if (stage == ANALYZE_NO_STAGE) + initcmd = NULL; + else + { + initcmd = stage_commands[stage]; + executeCommand(conn, initcmd, echo); + } + /* * Setup the database connections. We reuse the connection we already have * for the first slot. If not in parallel mode, the first slot in the * array contains the connection. */ - slots = ParallelSlotsSetup(cparams, progname, echo, conn, concurrentCons); - - /* - * Prepare all the connections to run the appropriate analyze stage, if - * caller requested that mode. - */ - if (stage != ANALYZE_NO_STAGE) - { - int j; - - /* We already emitted the message above */ - - for (j = 0; j < concurrentCons; j++) - executeCommand((slots + j)->connection, - stage_commands[stage], echo); - } + sa = ParallelSlotsSetup(concurrentCons, cparams, progname, echo, initcmd); + ParallelSlotsAdoptConn(sa, conn); initPQExpBuffer(&sql); @@ -719,7 +719,7 @@ vacuum_one_database(const ConnParams *cparams, goto finish; } - free_slot = ParallelSlotsGetIdle(slots, concurrentCons); + free_slot = ParallelSlotsGetIdle(sa, NULL); if (!free_slot) { failed = true; @@ -740,12 +740,12 @@ vacuum_one_database(const ConnParams *cparams, cell = cell->next; } while (cell != NULL); - if (!ParallelSlotsWaitCompletion(slots, concurrentCons)) + if (!ParallelSlotsWaitCompletion(sa)) failed = true; finish: - ParallelSlotsTerminate(slots, concurrentCons); - pg_free(slots); + ParallelSlotsTerminate(sa); + pg_free(sa); termPQExpBuffer(&sql); diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c index b625deb254..69581157c2 100644 --- a/src/fe_utils/parallel_slot.c +++ b/src/fe_utils/parallel_slot.c @@ -25,25 +25,16 @@ #include "common/logging.h" #include "fe_utils/cancel.h" #include "fe_utils/parallel_slot.h" +#include "fe_utils/query_utils.h" #define ERRCODE_UNDEFINED_TABLE "42P01" -static void init_slot(ParallelSlot *slot, PGconn *conn); static int select_loop(int maxFd, fd_set *workerset); static bool processQueryResult(ParallelSlot *slot, PGresult *result); -static void -init_slot(ParallelSlot *slot, PGconn *conn) -{ - slot->connection = conn; - /* Initially assume connection is idle */ - slot->isFree = true; - ParallelSlotClearHandler(slot); -} - /* * Process (and delete) a query result. Returns true if there's no problem, - * false otherwise. It's up to the handler to decide what cosntitutes a + * false otherwise. It's up to the handler to decide what constitutes a * problem. */ static bool @@ -136,152 +127,317 @@ select_loop(int maxFd, fd_set *workerset) return i; } +/* + * Return the offset of a suitable idle slot, or -1 if none are available. If + * the given dbname is not null, only idle slots connected to the given + * database are considered suitable, otherwise all idle connected slots are + * considered suitable. + */ +static int +find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname) +{ + int i; + + for (i = 0; i < sa->numslots; i++) + { + if (sa->slots[i].inUse) + continue; + + if (sa->slots[i].connection == NULL) + continue; + + if (dbname == NULL || + strcmp(PQdb(sa->slots[i].connection), dbname) == 0) + return i; + } + return -1; +} + +/* + * Return the offset of the first slot without a database connection, or -1 if + * all slots are connected. + */ +static int +find_unconnected_slot(const ParallelSlotArray *sa) +{ + int i; + + for (i = 0; i < sa->numslots; i++) + { + if (sa->slots[i].inUse) + continue; + + if (sa->slots[i].connection == NULL) + return i; + } + + return -1; +} + +/* + * Return the offset of the first idle slot, or -1 if all slots are busy. + */ +static int +find_any_idle_slot(const ParallelSlotArray *sa) +{ + int i; + + for (i = 0; i < sa->numslots; i++) + if (!sa->slots[i].inUse) + return i; + + return -1; +} + +/* + * Wait for any slot's connection to have query results, consume the results, + * and update the slot's status as appropriate. Returns true on success, + * false on cancellation, on error, or if no slots are connected. + */ +static bool +wait_on_slots(ParallelSlotArray *sa) +{ + int i; + fd_set slotset; + int maxFd = 0; + PGconn *cancelconn = NULL; + + /* We must reconstruct the fd_set for each call to select_loop */ + FD_ZERO(&slotset); + + for (i = 0; i < sa->numslots; i++) + { + int sock; + + /* We shouldn't get here if we still have slots without connections */ + Assert(sa->slots[i].connection != NULL); + + sock = PQsocket(sa->slots[i].connection); + + /* + * We don't really expect any connections to lose their sockets after + * startup, but just in case, cope by ignoring them. + */ + if (sock < 0) + continue; + + /* Keep track of the first valid connection we see. */ + if (cancelconn == NULL) + cancelconn = sa->slots[i].connection; + + FD_SET(sock, &slotset); + if (sock > maxFd) + maxFd = sock; + } + + /* + * If we get this far with no valid connections, processing cannot + * continue. + */ + if (cancelconn == NULL) + return false; + + SetCancelConn(sa->slots->connection); + i = select_loop(maxFd, &slotset); + ResetCancelConn(); + + /* failure? */ + if (i < 0) + return false; + + for (i = 0; i < sa->numslots; i++) + { + int sock; + + sock = PQsocket(sa->slots[i].connection); + + if (sock >= 0 && FD_ISSET(sock, &slotset)) + { + /* select() says input is available, so consume it */ + PQconsumeInput(sa->slots[i].connection); + } + + /* Collect result(s) as long as any are available */ + while (!PQisBusy(sa->slots[i].connection)) + { + PGresult *result = PQgetResult(sa->slots[i].connection); + + if (result != NULL) + { + /* Handle and discard the command result */ + if (!processQueryResult(&sa->slots[i], result)) + return false; + } + else + { + /* This connection has become idle */ + sa->slots[i].inUse = false; + ParallelSlotClearHandler(&sa->slots[i]); + break; + } + } + } + return true; +} + +/* + * Open a new database connection using the stored connection parameters and + * optionally a given dbname if not null, execute the stored initial command if + * any, and associate the new connection with the given slot. + */ +static void +connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname) +{ + const char *old_override; + ParallelSlot *slot = &sa->slots[slotno]; + + old_override = sa->cparams->override_dbname; + if (dbname) + sa->cparams->override_dbname = dbname; + slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true); + sa->cparams->override_dbname = old_override; + + if (PQsocket(slot->connection) >= FD_SETSIZE) + { + pg_log_fatal("too many jobs for this platform"); + exit(1); + } + + /* Setup the connection using the supplied command, if any. */ + if (sa->initcmd) + executeCommand(slot->connection, sa->initcmd, sa->echo); +} + /* * ParallelSlotsGetIdle * Return a connection slot that is ready to execute a command. * - * This returns the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that - * become available. If an error occurs, NULL is returned. + * The slot returned is chosen as follows: + * + * If any idle slot already has an open connection, and if either dbname is + * null or the existing connection is to the given database, that slot will be + * returned allowing the connection to be reused. + * + * Otherwise, if any idle slot is not yet connected to any database, the slot + * will be returned with it's connection opened using the stored cparams and + * optionally the given dbname if not null. + * + * Otherwise, if any idle slot exists, an idle slot will be chosen and returned + * after having it's connection disconnected and reconnected using the stored + * cparams and optionally the given dbname if not null. + * + * Otherwise, if any slots have connections that are busy, we loop on select() + * until one socket becomes available. When this happens, we read the whole + * set and mark as free all sockets that become available. We then select a + * slot using the same rules as above. + * + * Otherwise, we cannot return a slot, which is an error, and NULL is returned. + * + * For any connection created, if the stored initcmd is not null, it will be + * executed as a command on the newly formed connection before the slot is + * returned. + * + * If an error occurs, NULL is returned. */ ParallelSlot * -ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) +ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname) { - int i; - int firstFree = -1; + int offset; - /* - * Look for any connection currently free. If there is one, mark it as - * taken and let the caller know the slot to use. - */ - for (i = 0; i < numslots; i++) + Assert(sa); + Assert(sa->numslots > 0); + + while (1) { - if (slots[i].isFree) + /* First choice: a slot already connected to the desired database. */ + offset = find_matching_idle_slot(sa, dbname); + if (offset >= 0) { - slots[i].isFree = false; - return slots + i; - } - } - - /* - * No free slot found, so wait until one of the connections has finished - * its task and return the available slot. - */ - while (firstFree < 0) - { - fd_set slotset; - int maxFd = 0; - - /* We must reconstruct the fd_set for each call to select_loop */ - FD_ZERO(&slotset); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - /* - * We don't really expect any connections to lose their sockets - * after startup, but just in case, cope by ignoring them. - */ - if (sock < 0) - continue; - - FD_SET(sock, &slotset); - if (sock > maxFd) - maxFd = sock; + sa->slots[offset].inUse = true; + return &sa->slots[offset]; } - SetCancelConn(slots->connection); - i = select_loop(maxFd, &slotset); - ResetCancelConn(); + /* Second choice: a slot not connected to any database. */ + offset = find_unconnected_slot(sa); + if (offset >= 0) + { + connect_slot(sa, offset, dbname); + sa->slots[offset].inUse = true; + return &sa->slots[offset]; + } - /* failure? */ - if (i < 0) + /* Third choice: a slot connected to the wrong database. */ + offset = find_any_idle_slot(sa); + if (offset >= 0) + { + disconnectDatabase(sa->slots[offset].connection); + sa->slots[offset].connection = NULL; + connect_slot(sa, offset, dbname); + sa->slots[offset].inUse = true; + return &sa->slots[offset]; + } + + /* + * Fourth choice: block until one or more slots become available. If + * any slots hit a fatal error, we'll find out about that here and + * return NULL. + */ + if (!wait_on_slots(sa)) return NULL; - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - if (sock >= 0 && FD_ISSET(sock, &slotset)) - { - /* select() says input is available, so consume it */ - PQconsumeInput(slots[i].connection); - } - - /* Collect result(s) as long as any are available */ - while (!PQisBusy(slots[i].connection)) - { - PGresult *result = PQgetResult(slots[i].connection); - - if (result != NULL) - { - /* Handle and discard the command result */ - if (!processQueryResult(slots + i, result)) - return NULL; - } - else - { - /* This connection has become idle */ - slots[i].isFree = true; - ParallelSlotClearHandler(slots + i); - if (firstFree < 0) - firstFree = i; - break; - } - } - } } - - slots[firstFree].isFree = false; - return slots + firstFree; } /* * ParallelSlotsSetup - * Prepare a set of parallel slots to use on a given database. + * Prepare a set of parallel slots but do not connect to any database. * - * This creates and initializes a set of connections to the database - * using the information given by the caller, marking all parallel slots - * as free and ready to use. "conn" is an initial connection set up - * by the caller and is associated with the first slot in the parallel - * set. + * This creates and initializes a set of slots, marking all parallel slots as + * free and ready to use. Establishing connections is delayed until requesting + * a free slot. The cparams, progname, echo, and initcmd are stored for later + * use and must remain valid for the lifetime of the returned array. */ -ParallelSlot * -ParallelSlotsSetup(const ConnParams *cparams, - const char *progname, bool echo, - PGconn *conn, int numslots) +ParallelSlotArray * +ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, + bool echo, const char *initcmd) { - ParallelSlot *slots; - int i; + ParallelSlotArray *sa; - Assert(conn != NULL); + Assert(numslots > 0); + Assert(cparams != NULL); + Assert(progname != NULL); - slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots); - init_slot(slots, conn); - if (numslots > 1) - { - for (i = 1; i < numslots; i++) - { - conn = connectDatabase(cparams, progname, echo, false, true); + sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) + + numslots * sizeof(ParallelSlot)); - /* - * Fail and exit immediately if trying to use a socket in an - * unsupported range. POSIX requires open(2) to use the lowest - * unused file descriptor and the hint given relies on that. - */ - if (PQsocket(conn) >= FD_SETSIZE) - { - pg_log_fatal("too many jobs for this platform -- try %d", i); - exit(1); - } + sa->numslots = numslots; + sa->cparams = cparams; + sa->progname = progname; + sa->echo = echo; + sa->initcmd = initcmd; - init_slot(slots + i, conn); - } - } + return sa; +} - return slots; +/* + * ParallelSlotsAdoptConn + * Assign an open connection to the slots array for reuse. + * + * This turns over ownership of an open connection to a slots array. The + * caller should not further use or close the connection. All the connection's + * parameters (user, host, port, etc.) except possibly dbname should match + * those of the slots array's cparams, as given in ParallelSlotsSetup. If + * these parameters differ, subsequent behavior is undefined. + */ +void +ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn) +{ + int offset; + + offset = find_unconnected_slot(sa); + if (offset >= 0) + sa->slots[offset].connection = conn; + else + disconnectDatabase(conn); } /* @@ -292,13 +448,13 @@ ParallelSlotsSetup(const ConnParams *cparams, * terminate all connections. */ void -ParallelSlotsTerminate(ParallelSlot *slots, int numslots) +ParallelSlotsTerminate(ParallelSlotArray *sa) { int i; - for (i = 0; i < numslots; i++) + for (i = 0; i < sa->numslots; i++) { - PGconn *conn = slots[i].connection; + PGconn *conn = sa->slots[i].connection; if (conn == NULL) continue; @@ -314,13 +470,15 @@ ParallelSlotsTerminate(ParallelSlot *slots, int numslots) * error has been found on the way. */ bool -ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) +ParallelSlotsWaitCompletion(ParallelSlotArray *sa) { int i; - for (i = 0; i < numslots; i++) + for (i = 0; i < sa->numslots; i++) { - if (!consumeQueryResult(slots + i)) + if (sa->slots[i].connection == NULL) + continue; + if (!consumeQueryResult(&sa->slots[i])) return false; } @@ -350,6 +508,9 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context) { + Assert(res != NULL); + Assert(conn != NULL); + /* * If it's an error, report it. Errors about a missing table are harmless * so we continue processing; but die for other errors. diff --git a/src/include/fe_utils/parallel_slot.h b/src/include/fe_utils/parallel_slot.h index 8902f8d4f4..b7e2b0a29b 100644 --- a/src/include/fe_utils/parallel_slot.h +++ b/src/include/fe_utils/parallel_slot.h @@ -21,7 +21,7 @@ typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn, typedef struct ParallelSlot { PGconn *connection; /* One connection */ - bool isFree; /* Is it known to be idle? */ + bool inUse; /* Is the slot being used? */ /* * Prior to issuing a command or query on 'connection', a handler callback @@ -33,6 +33,16 @@ typedef struct ParallelSlot void *handler_context; } ParallelSlot; +typedef struct ParallelSlotArray +{ + int numslots; + ConnParams *cparams; + const char *progname; + bool echo; + const char *initcmd; + ParallelSlot slots[FLEXIBLE_ARRAY_MEMBER]; +} ParallelSlotArray; + static inline void ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler, void *context) @@ -48,15 +58,18 @@ ParallelSlotClearHandler(ParallelSlot *slot) slot->handler_context = NULL; } -extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots); +extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlotArray *slots, + const char *dbname); -extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams, - const char *progname, bool echo, - PGconn *conn, int numslots); +extern ParallelSlotArray *ParallelSlotsSetup(int numslots, ConnParams *cparams, + const char *progname, bool echo, + const char *initcmd); -extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots); +extern void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn); -extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots); +extern void ParallelSlotsTerminate(ParallelSlotArray *sa); + +extern bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa); extern bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 574a8a94fa..e017557e3e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -404,6 +404,7 @@ ConfigData ConfigVariable ConnCacheEntry ConnCacheKey +ConnParams ConnStatusType ConnType ConnectionStateEnum @@ -1730,6 +1731,7 @@ ParallelHashJoinState ParallelIndexScanDesc ParallelReadyList ParallelSlot +ParallelSlotArray ParallelState ParallelTableScanDesc ParallelTableScanDescData