From b0b0d84b3d663a148022e900ebfc164284a95f55 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 16 Oct 2015 17:18:05 -0400 Subject: [PATCH] Allow a parallel context to relaunch workers. This may allow some callers to avoid the overhead involved in tearing down a parallel context and then setting up a new one, which means releasing the DSM and then allocating and populating a new one. I suspect we'll want to revise the Gather node to make use of this new capability, but even if not it may be useful elsewhere and requires very little additional code. --- src/backend/access/transam/README.parallel | 5 +++ src/backend/access/transam/parallel.c | 49 ++++++++++++++++++++++ src/include/access/parallel.h | 1 + 3 files changed, 55 insertions(+) diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index 10051863fe..dfcbafabf0 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -221,3 +221,8 @@ pattern looks like this: DestroyParallelContext(pcxt); ExitParallelMode(); + +If desired, after WaitForParallelWorkersToFinish() has been called, another +call to LaunchParallelWorkers() can be made using the same parallel context. +Calls to these two functions can be alternated any number of times before +destroying the parallel context. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17f9a5ae6e..0085987f32 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -404,6 +404,52 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); + /* + * This function can be called for a parallel context for which it has + * already been called previously, but only if all of the old workers + * have already exited. When this case arises, we need to do some extra + * reinitialization. + */ + if (pcxt->nworkers_launched > 0) + { + FixedParallelState *fps; + char *error_queue_space; + + /* Clean out old worker handles. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + elog(ERROR, "previously launched worker still alive"); + if (pcxt->worker[i].bgwhandle != NULL) + { + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + } + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + + /* Reset number of workers launched. */ + pcxt->nworkers_launched = 0; + } + /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -428,8 +474,11 @@ LaunchParallelWorkers(ParallelContext *pcxt) if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) + { shm_mq_set_handle(pcxt->worker[i].error_mqh, pcxt->worker[i].bgwhandle); + pcxt->nworkers_launched++; + } else { /* diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 44f0616cb8..d4b7c5dd75 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -35,6 +35,7 @@ typedef struct ParallelContext dlist_node node; SubTransactionId subid; int nworkers; + int nworkers_launched; parallel_worker_main_type entrypoint; char *library_name; char *function_name;