diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 0a0157a878..54d9ea7be0 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -113,6 +113,9 @@ static FixedParallelState *MyFixedParallelState; /* List of active parallel contexts. */ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); +/* Backend-local copy of data from FixedParallelState. */ +static pid_t ParallelMasterPid; + /* * List of internal parallel worker entry points. We need this for * reasons explained in LookupParallelWorkerFunction(), below. @@ -133,6 +136,7 @@ static const struct static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname); +static void ParallelWorkerShutdown(int code, Datum arg); /* @@ -433,6 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt) WaitForParallelWorkersToFinish(pcxt); WaitForParallelWorkersToExit(pcxt); pcxt->nworkers_launched = 0; + if (pcxt->any_message_received) + { + pfree(pcxt->any_message_received); + pcxt->any_message_received = NULL; + } } /* Reset a few bits of fixed parallel state to a clean state. */ @@ -531,6 +540,14 @@ LaunchParallelWorkers(ParallelContext *pcxt) } } + /* + * Now that nworkers_launched has taken its final value, we can initialize + * any_message_received. + */ + if (pcxt->nworkers_launched > 0) + pcxt->any_message_received = + palloc0(sizeof(bool) * pcxt->nworkers_launched); + /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); } @@ -552,6 +569,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) for (;;) { bool anyone_alive = false; + int nfinished = 0; int i; /* @@ -563,7 +581,15 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) for (i = 0; i < pcxt->nworkers_launched; ++i) { - if (pcxt->worker[i].error_mqh != NULL) + /* + * If error_mqh is NULL, then the worker has already exited + * cleanly. If we have received a message through error_mqh from + * the worker, we know it started up cleanly, and therefore we're + * certain to be notified when it exits. + */ + if (pcxt->worker[i].error_mqh == NULL) + ++nfinished; + else if (pcxt->any_message_received[i]) { anyone_alive = true; break; @@ -571,7 +597,62 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) } if (!anyone_alive) - break; + { + /* If all workers are known to have finished, we're done. */ + if (nfinished >= pcxt->nworkers_launched) + { + Assert(nfinished == pcxt->nworkers_launched); + break; + } + + /* + * We didn't detect any living workers, but not all workers are + * known to have exited cleanly. Either not all workers have + * launched yet, or maybe some of them failed to start or + * terminated abnormally. + */ + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + pid_t pid; + shm_mq *mq; + + /* + * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we + * should just keep waiting. If it is BGWH_STOPPED, then + * further investigation is needed. + */ + if (pcxt->worker[i].error_mqh == NULL || + pcxt->worker[i].bgwhandle == NULL || + GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, + &pid) != BGWH_STOPPED) + continue; + + /* + * Check whether the worker ended up stopped without ever + * attaching to the error queue. If so, the postmaster was + * unable to fork the worker or it exited without initializing + * properly. We must throw an error, since the caller may + * have been expecting the worker to do some work before + * exiting. + */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + /* + * The worker is stopped, but is attached to the error queue. + * Unless there's a bug somewhere, this will only happen when + * the worker writes messages and terminates after the + * CHECK_FOR_INTERRUPTS() near the top of this function and + * before the call to GetBackgroundWorkerPid(). In that case, + * or latch should have been set as well and the right things + * will happen on the next pass through the loop. + */ + } + } WaitLatch(MyLatch, WL_LATCH_SET, -1, WAIT_EVENT_PARALLEL_FINISH); @@ -828,6 +909,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) { char msgtype; + if (pcxt->any_message_received != NULL) + pcxt->any_message_received[i] = true; + msgtype = pq_getmsgbyte(msg); switch (msgtype) @@ -1024,11 +1108,16 @@ ParallelWorkerMain(Datum main_arg) fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false); MyFixedParallelState = fps; + /* Arrange to signal the leader if we exit. */ + ParallelMasterPid = fps->parallel_master_pid; + ParallelMasterBackendId = fps->parallel_master_backend_id; + on_shmem_exit(ParallelWorkerShutdown, (Datum) 0); + /* - * Now that we have a worker number, we can find and attach to the error - * queue provided for us. That's good, because until we do that, any - * errors that happen here will not be reported back to the process that - * requested that this worker be launched. + * Now we can find and attach to the error queue provided for us. That's + * good, because until we do that, any errors that happen here will not be + * reported back to the process that requested that this worker be + * launched. */ error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false); mq = (shm_mq *) (error_queue_space + @@ -1146,9 +1235,6 @@ ParallelWorkerMain(Datum main_arg) SetTempNamespaceState(fps->temp_namespace_id, fps->temp_toast_namespace_id); - /* Set ParallelMasterBackendId so we know how to address temp relations. */ - ParallelMasterBackendId = fps->parallel_master_backend_id; - /* Restore reindex state. */ reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); RestoreReindexState(reindexspace); @@ -1197,6 +1283,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) SpinLockRelease(&fps->mutex); } +/* + * Make sure the leader tries to read from our error queue one more time. + * This guards against the case where we exit uncleanly without sending an + * ErrorResponse to the leader, for example because some code calls proc_exit + * directly. + */ +static void +ParallelWorkerShutdown(int code, Datum arg) +{ + SendProcSignal(ParallelMasterPid, + PROCSIG_PARALLEL_MESSAGE, + ParallelMasterBackendId); +} + /* * Look up (and possibly load) a parallel worker entry point function. * diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 8c6a747ced..32c2e32bea 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -43,6 +43,7 @@ typedef struct ParallelContext void *private_memory; shm_toc *toc; ParallelWorkerInfo *worker; + bool *any_message_received; } ParallelContext; typedef struct ParallelWorkerContext