/*------------------------------------------------------------------------- * * parallel.c * Infrastructure for launching parallel workers * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/access/transam/parallel.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/xact.h" #include "access/xlog.h" #include "access/parallel.h" #include "commands/async.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" #include "miscadmin.h" #include "storage/ipc.h" #include "storage/sinval.h" #include "storage/spin.h" #include "tcop/tcopprot.h" #include "utils/combocid.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/resowner.h" #include "utils/snapmgr.h" /* * We don't want to waste a lot of memory on an error queue which, most of * the time, will process only a handful of small messages. However, it is * desirable to make it large enough that a typical ErrorResponse can be sent * without blocking. That way, a worker that errors out can write the whole * message into the queue and terminate without waiting for the user backend. */ #define PARALLEL_ERROR_QUEUE_SIZE 16384 /* Magic number for parallel context TOC. */ #define PARALLEL_MAGIC 0x50477c7c /* * Magic numbers for parallel state sharing. Higher-level code should use * smaller values, leaving these very large ones for use by this module. */ #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) /* Fixed-size parallel state. */ typedef struct FixedParallelState { /* Fixed-size state that workers must restore. */ Oid database_id; Oid authenticated_user_id; Oid current_user_id; int sec_context; PGPROC *parallel_master_pgproc; pid_t parallel_master_pid; BackendId parallel_master_backend_id; /* Entrypoint for parallel workers. */ parallel_worker_main_type entrypoint; /* Mutex protects remaining fields. */ slock_t mutex; /* Track whether workers have attached. */ int workers_expected; int workers_attached; /* Maximum XactLastRecEnd of any worker. */ XLogRecPtr last_xlog_end; } FixedParallelState; /* * Our parallel worker number. We initialize this to -1, meaning that we are * not a parallel worker. In parallel workers, it will be set to a value >= 0 * and < the number of workers before any user code is invoked; each parallel * worker will get a different parallel worker number. */ int ParallelWorkerNumber = -1; /* Is there a parallel message pending which we need to receive? */ bool ParallelMessagePending = false; /* Pointer to our fixed parallel state. */ static FixedParallelState *MyFixedParallelState; /* List of active parallel contexts. */ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); /* Private functions. */ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); /* * Establish a new parallel context. This should be done after entering * parallel mode, and (unless there is an error) the context should be * destroyed before exiting the current subtransaction. */ ParallelContext * CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; /* It is unsafe to create a parallel context if not in parallel mode. */ Assert(IsInParallelMode()); /* Number of workers should be non-negative. */ Assert(nworkers >= 0); /* * If dynamic shared memory is not available, we won't be able to use * background workers. */ if (dynamic_shared_memory_type == DSM_IMPL_NONE) nworkers = 0; /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); /* Initialize a new ParallelContext. */ pcxt = palloc0(sizeof(ParallelContext)); pcxt->subid = GetCurrentSubTransactionId(); pcxt->nworkers = nworkers; pcxt->entrypoint = entrypoint; pcxt->error_context_stack = error_context_stack; shm_toc_initialize_estimator(&pcxt->estimator); dlist_push_head(&pcxt_list, &pcxt->node); /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); return pcxt; } /* * Establish a new parallel context that calls a function provided by an * extension. This works around the fact that the library might get mapped * at a different address in each backend. */ ParallelContext * CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; /* We might be running in a very short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); /* Create the context. */ pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); pcxt->library_name = pstrdup(library_name); pcxt->function_name = pstrdup(function_name); /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); return pcxt; } /* * Establish the dynamic shared memory segment for a parallel context and * copied state and other bookkeeping information that will need by parallel * workers into it. */ void InitializeParallelDSM(ParallelContext *pcxt) { MemoryContext oldcontext; Size library_len = 0; Size guc_len = 0; Size combocidlen = 0; Size tsnaplen = 0; Size asnaplen = 0; Size tstatelen = 0; Size segsize = 0; int i; FixedParallelState *fps; Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); /* We might be running in a very short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); /* Allow space to store the fixed-size parallel state. */ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* * Normally, the user will have requested at least one worker process, * but if by chance they have not, we can skip a bunch of things here. */ if (pcxt->nworkers > 0) { /* Estimate space for various kinds of state sharing. */ library_len = EstimateLibraryStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, library_len); guc_len = EstimateGUCStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, guc_len); combocidlen = EstimateComboCIDStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); tsnaplen = EstimateSnapshotSpace(transaction_snapshot); shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); asnaplen = EstimateSnapshotSpace(active_snapshot); shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); /* If you add more chunks here, you probably need to add keys. */ shm_toc_estimate_keys(&pcxt->estimator, 6); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == PARALLEL_ERROR_QUEUE_SIZE, "parallel error queue size not buffer-aligned"); shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate how much we'll need for extension entrypoint info. */ if (pcxt->library_name != NULL) { Assert(pcxt->entrypoint == ParallelExtensionTrampoline); Assert(pcxt->function_name != NULL); shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); shm_toc_estimate_keys(&pcxt->estimator, 1); } } /* * Create DSM and initialize with new table of contents. But if the user * didn't request any workers, then don't bother creating a dynamic shared * memory segment; instead, just use backend-private memory. * * Also, if we can't create a dynamic shared memory segment because the * maximum number of segments have already been created, then fall back * to backend-private memory, and plan not to use any workers. We hope * this won't happen very often, but it's better to abandon the use of * parallelism than to fail outright. */ segsize = shm_toc_estimate(&pcxt->estimator); if (pcxt->nworkers != 0) pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (pcxt->seg != NULL) pcxt->toc = shm_toc_create(PARALLEL_MAGIC, dsm_segment_address(pcxt->seg), segsize); else { pcxt->nworkers = 0; pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize); pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory, segsize); } /* Initialize fixed-size state in shared memory. */ fps = (FixedParallelState *) shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); fps->database_id = MyDatabaseId; fps->authenticated_user_id = GetAuthenticatedUserId(); GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); fps->parallel_master_pgproc = MyProc; fps->parallel_master_pid = MyProcPid; fps->parallel_master_backend_id = MyBackendId; fps->entrypoint = pcxt->entrypoint; SpinLockInit(&fps->mutex); fps->workers_expected = pcxt->nworkers; fps->workers_attached = 0; fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); /* We can skip the rest of this if we're not budgeting for any workers. */ if (pcxt->nworkers > 0) { char *libraryspace; char *gucspace; char *combocidspace; char *tsnapspace; char *asnapspace; char *tstatespace; char *error_queue_space; /* Serialize shared libraries we have loaded. */ libraryspace = shm_toc_allocate(pcxt->toc, library_len); SerializeLibraryState(library_len, libraryspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace); /* Serialize GUC settings. */ gucspace = shm_toc_allocate(pcxt->toc, guc_len); SerializeGUCState(guc_len, gucspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); /* Serialize combo CID state. */ combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); SerializeComboCIDState(combocidlen, combocidspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); /* Serialize transaction snapshot and active snapshot. */ tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); SerializeSnapshot(transaction_snapshot, tsnapspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, tsnapspace); asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); SerializeSnapshot(active_snapshot, asnapspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); /* Serialize transaction state. */ tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); SerializeTransactionState(tstatelen, tstatespace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); /* Allocate space for worker information. */ pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); /* * Establish error queues in dynamic shared memory. * * These queues should be used only for transmitting ErrorResponse, * NoticeResponse, and NotifyResponse protocol messages. Tuple data * should be transmitted via separate (possibly larger?) queues. */ error_queue_space = shm_toc_allocate(pcxt->toc, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); for (i = 0; i < pcxt->nworkers; ++i) { char *start; shm_mq *mq; start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); shm_mq_set_receiver(mq, MyProc); pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); /* Serialize extension entrypoint information. */ if (pcxt->library_name != NULL) { Size lnamelen = strlen(pcxt->library_name); char *extensionstate; extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + strlen(pcxt->function_name) + 2); strcpy(extensionstate, pcxt->library_name); strcpy(extensionstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, extensionstate); } } /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); } /* * Launch parallel workers. */ void LaunchParallelWorkers(ParallelContext *pcxt) { MemoryContext oldcontext; BackgroundWorker worker; int i; bool any_registrations_failed = false; /* Skip this if we have no workers. */ if (pcxt->nworkers == 0) return; /* If we do have workers, we'd better have a DSM segment. */ Assert(pcxt->seg != NULL); /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = ParallelWorkerMain; worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); worker.bgw_notify_pid = MyProcPid; /* * Start workers. * * The caller must be able to tolerate ending up with fewer workers than * expected, so there is no need to throw an error here if registration * fails. It wouldn't help much anyway, because registering the worker * in no way guarantees that it will start up and initialize successfully. */ for (i = 0; i < pcxt->nworkers; ++i) { if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) shm_mq_set_handle(pcxt->worker[i].error_mqh, pcxt->worker[i].bgwhandle); else { /* * If we weren't able to register the worker, then we've bumped * up against the max_worker_processes limit, and future * registrations will probably fail too, so arrange to skip them. * But we still have to execute this code for the remaining slots * to make sure that we forget about the error queues we budgeted * for those workers. Otherwise, we'll wait for them to start, * but they never will. */ any_registrations_failed = true; pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].error_mqh = NULL; } } /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); } /* * Wait for all workers to exit. * * Even if the parallel operation seems to have completed successfully, it's * important to call this function afterwards. We must not miss any errors * the workers may have thrown during the parallel operation, or any that they * may yet throw while shutting down. * * Also, we want to update our notion of XactLastRecEnd based on worker * feedback. */ void WaitForParallelWorkersToFinish(ParallelContext *pcxt) { for (;;) { bool anyone_alive = false; int i; /* * This will process any parallel messages that are pending, which * may change the outcome of the loop that follows. It may also * throw an error propagated from a worker. */ CHECK_FOR_INTERRUPTS(); for (i = 0; i < pcxt->nworkers; ++i) { if (pcxt->worker[i].error_mqh != NULL) { anyone_alive = true; break; } } if (!anyone_alive) break; WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); ResetLatch(&MyProc->procLatch); } if (pcxt->toc != NULL) { FixedParallelState *fps; fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); if (fps->last_xlog_end > XactLastRecEnd) XactLastRecEnd = fps->last_xlog_end; } } /* * Destroy a parallel context. * * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() * first, before calling this function. When this function is invoked, any * remaining workers are forcibly killed; the dynamic shared memory segment * is unmapped; and we then wait (uninterruptibly) for the workers to exit. */ void DestroyParallelContext(ParallelContext *pcxt) { int i; /* * Be careful about order of operations here! We remove the parallel * context from the list before we do anything else; otherwise, if an * error occurs during a subsequent step, we might try to nuke it again * from AtEOXact_Parallel or AtEOSubXact_Parallel. */ dlist_delete(&pcxt->node); /* Kill each worker in turn, and forget their error queues. */ for (i = 0; i < pcxt->nworkers; ++i) { if (pcxt->worker[i].bgwhandle != NULL) TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); if (pcxt->worker[i].error_mqh != NULL) { pfree(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } } /* * If we have allocated a shared memory segment, detach it. This will * implicitly detach the error queues, and any other shared memory queues, * stored there. */ if (pcxt->seg != NULL) { dsm_detach(pcxt->seg); pcxt->seg = NULL; } /* * If this parallel context is actually in backend-private memory rather * than shared memory, free that memory instead. */ if (pcxt->private_memory != NULL) { pfree(pcxt->private_memory); pcxt->private_memory = NULL; } /* Wait until the workers actually die. */ for (i = 0; i < pcxt->nworkers; ++i) { BgwHandleStatus status; if (pcxt->worker[i].bgwhandle == NULL) continue; /* * We can't finish transaction commit or abort until all of the * workers are dead. This means, in particular, that we can't respond * to interrupts at this stage. */ HOLD_INTERRUPTS(); status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); RESUME_INTERRUPTS(); /* * If the postmaster kicked the bucket, we have no chance of cleaning * up safely -- we won't be able to tell when our workers are actually * dead. This doesn't necessitate a PANIC since they will all abort * eventually, but we can't safely continue this session. */ if (status == BGWH_POSTMASTER_DIED) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("postmaster exited during a parallel transaction"))); /* Release memory. */ pfree(pcxt->worker[i].bgwhandle); pcxt->worker[i].bgwhandle = NULL; } /* Free the worker array itself. */ if (pcxt->worker != NULL) { pfree(pcxt->worker); pcxt->worker = NULL; } /* Free memory. */ pfree(pcxt); } /* * Are there any parallel contexts currently active? */ bool ParallelContextActive(void) { return !dlist_is_empty(&pcxt_list); } /* * Handle receipt of an interrupt indicating a parallel worker message. */ void HandleParallelMessageInterrupt(void) { int save_errno = errno; InterruptPending = true; ParallelMessagePending = true; SetLatch(MyLatch); errno = save_errno; } /* * Handle any queued protocol messages received from parallel workers. */ void HandleParallelMessages(void) { dlist_iter iter; ParallelMessagePending = false; dlist_foreach(iter, &pcxt_list) { ParallelContext *pcxt; int i; Size nbytes; void *data; pcxt = dlist_container(ParallelContext, node, iter.cur); if (pcxt->worker == NULL) continue; for (i = 0; i < pcxt->nworkers; ++i) { /* * Read as many messages as we can from each worker, but stop * when either (1) the error queue goes away, which can happen if * we receive a Terminate message from the worker; or (2) no more * messages can be read from the worker without blocking. */ while (pcxt->worker[i].error_mqh != NULL) { shm_mq_result res; res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, &data, true); if (res == SHM_MQ_WOULD_BLOCK) break; else if (res == SHM_MQ_SUCCESS) { StringInfoData msg; initStringInfo(&msg); appendBinaryStringInfo(&msg, data, nbytes); HandleParallelMessage(pcxt, i, &msg); pfree(msg.data); } else ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ errmsg("lost connection to parallel worker"))); /* This might make the error queue go away. */ CHECK_FOR_INTERRUPTS(); } } } } /* * Handle a single protocol message received from a single parallel worker. */ static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) { char msgtype; msgtype = pq_getmsgbyte(msg); switch (msgtype) { case 'K': /* BackendKeyData */ { int32 pid = pq_getmsgint(msg, 4); (void) pq_getmsgint(msg, 4); /* discard cancel key */ (void) pq_getmsgend(msg); pcxt->worker[i].pid = pid; break; } case 'E': /* ErrorResponse */ case 'N': /* NoticeResponse */ { ErrorData edata; ErrorContextCallback errctx; ErrorContextCallback *save_error_context_stack; /* * Rethrow the error using the error context callbacks that * were in effect when the context was created, not the * current ones. */ save_error_context_stack = error_context_stack; errctx.callback = ParallelErrorContext; errctx.arg = &pcxt->worker[i].pid; errctx.previous = pcxt->error_context_stack; error_context_stack = &errctx; /* Parse ErrorReponse or NoticeResponse. */ pq_parse_errornotice(msg, &edata); /* Death of a worker isn't enough justification for suicide. */ edata.elevel = Min(edata.elevel, ERROR); /* Rethrow error or notice. */ ThrowErrorData(&edata); /* Restore previous context. */ error_context_stack = save_error_context_stack; break; } case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); break; } case 'X': /* Terminate, indicating clean exit */ { pfree(pcxt->worker[i].bgwhandle); pfree(pcxt->worker[i].error_mqh); pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].error_mqh = NULL; break; } default: { elog(ERROR, "unknown message type: %c (%d bytes)", msgtype, msg->len); } } } /* * End-of-subtransaction cleanup for parallel contexts. * * Currently, it's forbidden to enter or leave a subtransaction while * parallel mode is in effect, so we could just blow away everything. But * we may want to relax that restriction in the future, so this code * contemplates that there may be multiple subtransaction IDs in pcxt_list. */ void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) { while (!dlist_is_empty(&pcxt_list)) { ParallelContext *pcxt; pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); if (pcxt->subid != mySubId) break; if (isCommit) elog(WARNING, "leaked parallel context"); DestroyParallelContext(pcxt); } } /* * End-of-transaction cleanup for parallel contexts. */ void AtEOXact_Parallel(bool isCommit) { while (!dlist_is_empty(&pcxt_list)) { ParallelContext *pcxt; pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); if (isCommit) elog(WARNING, "leaked parallel context"); DestroyParallelContext(pcxt); } } /* * Main entrypoint for parallel workers. */ static void ParallelWorkerMain(Datum main_arg) { dsm_segment *seg; shm_toc *toc; FixedParallelState *fps; char *error_queue_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; char *gucspace; char *combocidspace; char *tsnapspace; char *asnapspace; char *tstatespace; StringInfoData msgbuf; /* Establish signal handlers. */ pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Set up a memory context and resource owner. */ Assert(CurrentResourceOwner == NULL); CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, "parallel worker", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); /* * Now that we have a resource owner, we can attach to the dynamic * shared memory segment and read the table of contents. */ seg = dsm_attach(DatumGetUInt32(main_arg)); if (seg == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("unable to map dynamic shared memory segment"))); toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); if (toc == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("bad magic number in dynamic shared memory segment"))); /* Determine and set our worker number. */ fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); Assert(fps != NULL); Assert(ParallelWorkerNumber == -1); SpinLockAcquire(&fps->mutex); if (fps->workers_attached < fps->workers_expected) ParallelWorkerNumber = fps->workers_attached++; SpinLockRelease(&fps->mutex); if (ParallelWorkerNumber < 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("too many parallel workers already attached"))); MyFixedParallelState = fps; /* * Now that we have a worker number, we can find and attach to the error * queue provided for us. That's good, because until we do that, any * errors that happen here will not be reported back to the process that * requested that this worker be launched. */ error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); mq = (shm_mq *) (error_queue_space + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); shm_mq_set_sender(mq, MyProc); mqh = shm_mq_attach(mq, seg, NULL); pq_redirect_to_shm_mq(mq, mqh); pq_set_parallel_master(fps->parallel_master_pid, fps->parallel_master_backend_id); /* * Send a BackendKeyData message to the process that initiated parallelism * so that it has access to our PID before it receives any other messages * from us. Our cancel key is sent, too, since that's the way the protocol * message is defined, but it won't actually be used for anything in this * case. */ pq_beginmessage(&msgbuf, 'K'); pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32)); pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32)); pq_endmessage(&msgbuf); /* * Hooray! Primary initialization is complete. Now, we need to set up * our backend-local state to match the original backend. */ /* * Load libraries that were loaded by original backend. We want to do this * before restoring GUCs, because the libraries might define custom * variables. */ libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); Assert(libraryspace != NULL); RestoreLibraryState(libraryspace); /* Restore database connection. */ BackgroundWorkerInitializeConnectionByOid(fps->database_id, fps->authenticated_user_id); /* Restore GUC values from launching backend. */ gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); Assert(gucspace != NULL); StartTransactionCommand(); RestoreGUCState(gucspace); CommitTransactionCommand(); /* Crank up a transaction state appropriate to a parallel worker. */ tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); StartParallelWorkerTransaction(tstatespace); /* Restore combo CID state. */ combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); Assert(combocidspace != NULL); RestoreComboCIDState(combocidspace); /* Restore transaction snapshot. */ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); Assert(tsnapspace != NULL); RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), fps->parallel_master_pgproc); /* Restore active snapshot. */ asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); Assert(asnapspace != NULL); PushActiveSnapshot(RestoreSnapshot(asnapspace)); /* Restore user ID and security context. */ SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); /* * We've initialized all of our state now; nothing should change hereafter. */ EnterParallelMode(); /* * Time to do the real work: invoke the caller-supplied code. * * If you get a crash at this line, see the comments for * ParallelExtensionTrampoline. */ fps->entrypoint(seg, toc); /* Must exit parallel mode to pop active snapshot. */ ExitParallelMode(); /* Must pop active snapshot so resowner.c doesn't complain. */ PopActiveSnapshot(); /* Shut down the parallel-worker transaction. */ EndParallelWorkerTransaction(); /* Report success. */ pq_putmessage('X', NULL, 0); } /* * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a * function living in a dynamically loaded module, because the module might * not be loaded in every process, or might be loaded but not at the same * address. To work around that problem, CreateParallelContextForExtension() * arranges to call this function rather than calling the extension-provided * function directly; and this function then looks up the real entrypoint and * calls it. */ static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) { char *extensionstate; char *library_name; char *function_name; parallel_worker_main_type entrypt; extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); Assert(extensionstate != NULL); library_name = extensionstate; function_name = extensionstate + strlen(library_name) + 1; entrypt = (parallel_worker_main_type) load_external_function(library_name, function_name, true, NULL); entrypt(seg, toc); } /* * Give the user a hint that this is a message propagated from a parallel * worker. Otherwise, it can sometimes be confusing to understand what * actually happened. */ static void ParallelErrorContext(void *arg) { errcontext("parallel worker, pid %d", * (int32 *) arg); } /* * Update shared memory with the ending location of the last WAL record we * wrote, if it's greater than the value already stored there. */ void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) { FixedParallelState *fps = MyFixedParallelState; Assert(fps != NULL); SpinLockAcquire(&fps->mutex); if (fps->last_xlog_end < last_xlog_end) fps->last_xlog_end = last_xlog_end; SpinLockRelease(&fps->mutex); }