diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index ba31ac8ed7..07821a0cfd 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -42,6 +42,7 @@ * * In the master process, the workerStatus field for each worker has one of * the following values: + * WRKR_NOT_STARTED: we've not yet forked this worker * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended @@ -76,11 +77,15 @@ /* Worker process statuses */ typedef enum { + WRKR_NOT_STARTED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_TERMINATED } T_WorkerStatus; +#define WORKER_IS_RUNNING(workerStatus) \ + ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) + /* * Private per-parallel-worker state (typedef for this is in parallel.h). * @@ -417,7 +422,9 @@ ShutdownWorkersHard(ParallelState *pstate) /* * Close our write end of the sockets so that any workers waiting for - * commands know they can exit. + * commands know they can exit. (Note: some of the pipeWrite fields might + * still be zero, if we failed to initialize all the workers. Hence, just + * ignore errors here.) */ for (i = 0; i < pstate->numWorkers; i++) closesocket(pstate->parallelSlot[i].pipeWrite); @@ -491,7 +498,7 @@ WaitForTerminatingWorkers(ParallelState *pstate) for (j = 0; j < pstate->numWorkers; j++) { - if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus)) { lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; nrun++; @@ -927,6 +934,7 @@ ParallelBackupStart(ArchiveHandle *AH) if (AH->public.numWorkers == 1) return pstate; + /* Create status arrays, being sure to initialize all fields to 0 */ pstate->te = (TocEntry **) pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); pstate->parallelSlot = (ParallelSlot *) @@ -976,13 +984,6 @@ ParallelBackupStart(ArchiveHandle *AH) "could not create communication channels: %s\n", strerror(errno)); - pstate->te[i] = NULL; /* just for safety */ - - slot->workerStatus = WRKR_IDLE; - slot->AH = NULL; - slot->callback = NULL; - slot->callback_data = NULL; - /* master's ends of the pipes */ slot->pipeRead = pipeWM[PIPE_READ]; slot->pipeWrite = pipeMW[PIPE_WRITE]; @@ -1000,6 +1001,7 @@ ParallelBackupStart(ArchiveHandle *AH) handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(slot->threadId)); slot->hThread = handle; + slot->workerStatus = WRKR_IDLE; #else /* !WIN32 */ pid = fork(); if (pid == 0) @@ -1044,6 +1046,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* In Master after successful fork */ slot->pid = pid; + slot->workerStatus = WRKR_IDLE; /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); @@ -1273,7 +1276,7 @@ GetIdleWorker(ParallelState *pstate) } /* - * Return true iff every worker is in the WRKR_TERMINATED state. + * Return true iff no worker is running. */ static bool HasEveryWorkerTerminated(ParallelState *pstate) @@ -1282,7 +1285,7 @@ HasEveryWorkerTerminated(ParallelState *pstate) for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) return false; } return true; @@ -1618,7 +1621,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) FD_ZERO(&workerset); for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) continue; FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); if (pstate->parallelSlot[i].pipeRead > maxFd) @@ -1643,6 +1646,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) { char *msg; + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) continue;