diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 02bc8feca7..1545f03656 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -818,7 +818,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser - LWLock + LWLock ShmemIndexLock Waiting to find or allocate space in shared memory. @@ -1069,6 +1069,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser predicate_lock_manager Waiting to add or examine predicate lock information. + + parallel_query_dsa + Waiting for parallel query dynamic shared memory allocation lock. + Lock relation diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f9c85989d8..8a6f844e35 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -34,6 +34,7 @@ #include "optimizer/planner.h" #include "storage/spin.h" #include "tcop/tcopprot.h" +#include "utils/dsa.h" #include "utils/memutils.h" #include "utils/snapmgr.h" @@ -47,6 +48,7 @@ #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) int param_len; int instrumentation_len = 0; int instrument_offset = 0; + Size dsa_minsize = dsa_minimum_size(); /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); @@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) shm_toc_estimate_keys(&pcxt->estimator, 1); } + /* Estimate space for DSA area. */ + shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Everyone's had a chance to ask for space, so now create the DSM. */ InitializeParallelDSM(pcxt); @@ -466,6 +473,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->instrumentation = instrumentation; } + /* + * Create a DSA area that can be used by the leader and all workers. + * (However, if we failed to create a DSM and are using private memory + * instead, then skip this.) + */ + if (pcxt->seg != NULL) + { + char *area_space; + + area_space = shm_toc_allocate(pcxt->toc, dsa_minsize); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space); + pei->area = dsa_create_in_place(area_space, dsa_minsize, + LWTRANCHE_PARALLEL_QUERY_DSA, + "parallel_query_dsa", + pcxt->seg); + } + + /* + * Make the area available to executor nodes running in the leader. See + * also ParallelQueryMain which makes it available to workers. + */ + estate->es_query_dsa = pei->area; + /* * Give parallel-aware nodes a chance to initialize their shared data. * This also initializes the elements of instrumentation->ps_instrument, @@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { + if (pei->area != NULL) + { + dsa_detach(pei->area); + pei->area = NULL; + } if (pei->pcxt != NULL) { DestroyParallelContext(pei->pcxt); @@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; int instrument_options = 0; + void *area_space; + dsa_area *area; /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); @@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during query execution. */ InstrStartParallelQuery(); - /* Start up the executor, have it run the plan, and then shut it down. */ + /* Attach to the dynamic shared memory area. */ + area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA); + area = dsa_attach_in_place(area_space, seg); + + /* Start up the executor */ ExecutorStart(queryDesc, 0); + + /* Special executor initialization steps for parallel workers */ + queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); + + /* Run the plan */ ExecutorRun(queryDesc, ForwardScanDirection, 0L); + + /* Shut down the executor */ ExecutorFinish(queryDesc); /* Report buffer usage during parallel execution. */ @@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorEnd(queryDesc); /* Cleanup. */ + dsa_detach(area); FreeQueryDesc(queryDesc); (*receiver->rDestroy) (receiver); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index f4c6d37a11..4bbee691a7 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -17,6 +17,7 @@ #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" +#include "utils/dsa.h" typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; @@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo BufferUsage *buffer_usage; SharedExecutorInstrumentation *instrumentation; shm_mq_handle **tqueue; + dsa_area *area; bool finished; } ParallelExecutorInfo; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 703604ab9d..5c3b8683f5 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -427,6 +427,9 @@ typedef struct EState HeapTuple *es_epqTuple; /* array of EPQ substitute tuples */ bool *es_epqTupleSet; /* true if EPQ tuple is provided */ bool *es_epqScanDone; /* true if EPQ tuple has been fetched */ + + /* The per-query shared memory area to use for parallel execution. */ + struct dsa_area *es_query_dsa; } EState; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index db1c687e21..3ca4db0a72 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -210,6 +210,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds;