diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 0e2bfa106a..5630dc626d 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -45,6 +45,8 @@ * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended + * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING + * state, and must be NULL in other states. */ #include "postgres_fe.h" @@ -71,6 +73,45 @@ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ +/* Worker process statuses */ +typedef enum +{ + WRKR_IDLE, + WRKR_WORKING, + WRKR_TERMINATED +} T_WorkerStatus; + +/* + * Private per-parallel-worker state (typedef for this is in parallel.h). + * + * Much of this is valid only in the master process (or, on Windows, should + * be touched only by the master thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ +struct ParallelSlot +{ + T_WorkerStatus workerStatus; /* see enum above */ + + /* These fields are valid if workerStatus == WRKR_WORKING: */ + ParallelCompletionPtr callback; /* function to call on completion */ + void *callback_data; /* passthru data for it */ + + ArchiveHandle *AH; /* Archive data worker is using */ + + int pipeRead; /* master's end of the pipes */ + int pipeWrite; + int pipeRevRead; /* child's end of the pipes */ + int pipeRevWrite; + + /* Child process/thread identity info: */ +#ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; +#else + pid_t pid; +#endif +}; + #ifdef WIN32 /* @@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate) } #endif /* WIN32 */ - /* On all platforms, update workerStatus as well */ + /* On all platforms, update workerStatus and te[] as well */ Assert(j < pstate->numWorkers); slot->workerStatus = WRKR_TERMINATED; + pstate->te[j] = NULL; } } @@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH) { ParallelState *pstate; int i; - const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); Assert(AH->public.numWorkers > 0); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate->numWorkers = AH->public.numWorkers; + pstate->te = NULL; pstate->parallelSlot = NULL; if (AH->public.numWorkers == 1) return pstate; - pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); - memset((void *) pstate->parallelSlot, 0, slotSize); + pstate->te = (TocEntry **) + pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); + pstate->parallelSlot = (ParallelSlot *) + pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot)); #ifdef WIN32 /* Make fmtId() and fmtQualifiedId() use thread-local storage */ @@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH) "could not create communication channels: %s\n", strerror(errno)); + pstate->te[i] = NULL; /* just for safety */ + slot->workerStatus = WRKR_IDLE; slot->AH = NULL; - slot->te = NULL; slot->callback = NULL; slot->callback_data = NULL; @@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) set_cancel_pstate(NULL); /* Release state (mere neatnik-ism, since we're about to terminate) */ + free(pstate->te); free(pstate->parallelSlot); free(pstate); } @@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH, /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; - pstate->parallelSlot[worker].te = te; pstate->parallelSlot[worker].callback = callback; pstate->parallelSlot[worker].callback_data = callback_data; + pstate->te[worker] = te; } /* @@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) if (messageStartsWith(msg, "OK ")) { ParallelSlot *slot = &pstate->parallelSlot[worker]; - TocEntry *te = slot->te; + TocEntry *te = pstate->te[worker]; int status; status = parseWorkerResponse(AH, te, msg); slot->callback(AH, te, status, slot->callback_data); slot->workerStatus = WRKR_IDLE; - slot->te = NULL; + pstate->te[worker] = NULL; } else exit_horribly(modulename, diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h index 8ee629b106..e0c442cf37 100644 --- a/src/bin/pg_dump/parallel.h +++ b/src/bin/pg_dump/parallel.h @@ -33,51 +33,16 @@ typedef enum WFW_ALL_IDLE } WFW_WaitOption; -/* Worker process statuses */ -typedef enum -{ - WRKR_IDLE, - WRKR_WORKING, - WRKR_TERMINATED -} T_WorkerStatus; - -/* - * Per-parallel-worker state of parallel.c. - * - * Much of this is valid only in the master process (or, on Windows, should - * be touched only by the master thread). But the AH field should be touched - * only by workers. The pipe descriptors are valid everywhere. - */ -typedef struct ParallelSlot -{ - T_WorkerStatus workerStatus; /* see enum above */ - - /* These fields are valid if workerStatus == WRKR_WORKING: */ - TocEntry *te; /* item being worked on */ - ParallelCompletionPtr callback; /* function to call on completion */ - void *callback_data; /* passthru data for it */ - - ArchiveHandle *AH; /* Archive data worker is using */ - - int pipeRead; /* master's end of the pipes */ - int pipeWrite; - int pipeRevRead; /* child's end of the pipes */ - int pipeRevWrite; - - /* Child process/thread identity info: */ -#ifdef WIN32 - uintptr_t hThread; - unsigned int threadId; -#else - pid_t pid; -#endif -} ParallelSlot; +/* ParallelSlot is an opaque struct known only within parallel.c */ +typedef struct ParallelSlot ParallelSlot; /* Overall state for parallel.c */ typedef struct ParallelState { int numWorkers; /* allowed number of workers */ - ParallelSlot *parallelSlot; /* array of numWorkers slots */ + /* these arrays have numWorkers entries, one per worker: */ + TocEntry **te; /* item being worked on, or NULL */ + ParallelSlot *parallelSlot; /* private info about each worker */ } ParallelState; #ifdef WIN32 diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index e19c24aec9..bba8b6ca9f 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, for (k = 0; k < pstate->numWorkers; k++) { - if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING && - pstate->parallelSlot[k].te->section == SECTION_DATA) + TocEntry *running_te = pstate->te[k]; + + if (running_te != NULL && + running_te->section == SECTION_DATA) count++; } if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) @@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, */ for (i = 0; i < pstate->numWorkers; i++) { - TocEntry *running_te; + TocEntry *running_te = pstate->te[i]; - if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) + if (running_te == NULL) continue; - running_te = pstate->parallelSlot[i].te; - if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) {