diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8..ad9eba63dd 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,16 +47,25 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 +/* + * Fixed-size random stuff that we need to pass to parallel workers. + */ +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ +} FixedParallelExecutorState; + /* * DSM structure for accumulating per-PlanState instrumentation. * @@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); - /* Run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + + /* + * Run the plan. If we specified a tuple bound, be careful not to demand + * more tuples than that. + */ + ExecutorRun(queryDesc, + ForwardScanDirection, + fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed, + true); /* Shut down the executor */ ExecutorFinish(queryDesc); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 36d2914249..c1aa5064c9 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node) return false; } + +/* + * ExecSetTupleBound + * + * Set a tuple bound for a planstate node. This lets child plan nodes + * optimize based on the knowledge that the maximum number of tuples that + * their parent will demand is limited. The tuple bound for a node may + * only be changed between scans (i.e., after node initialization or just + * before an ExecReScan call). + * + * Any negative tuples_needed value means "no limit", which should be the + * default assumption when this is not called at all for a particular node. + * + * Note: if this is called repeatedly on a plan tree, the exact same set + * of nodes must be updated with the new limit each time; be careful that + * only unchanging conditions are tested here. + */ +void +ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) +{ + /* + * Since this function recurses, in principle we should check stack depth + * here. In practice, it's probably pointless since the earlier node + * initialization tree traversal would surely have consumed more stack. + */ + + if (IsA(child_node, SortState)) + { + /* + * If it is a Sort node, notify it that it can use bounded sort. + * + * Note: it is the responsibility of nodeSort.c to react properly to + * changes of these parameters. If we ever redesign this, it'd be a + * good idea to integrate this signaling with the parameter-change + * mechanism. + */ + SortState *sortState = (SortState *) child_node; + + if (tuples_needed < 0) + { + /* make sure flag gets reset if needed upon rescan */ + sortState->bounded = false; + } + else + { + sortState->bounded = true; + sortState->bound = tuples_needed; + } + } + else if (IsA(child_node, MergeAppendState)) + { + /* + * If it is a MergeAppend, we can apply the bound to any nodes that + * are children of the MergeAppend, since the MergeAppend surely need + * read no more than that many tuples from any one input. + */ + MergeAppendState *maState = (MergeAppendState *) child_node; + int i; + + for (i = 0; i < maState->ms_nplans; i++) + ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + } + else if (IsA(child_node, ResultState)) + { + /* + * Similarly, for a projecting Result, we can apply the bound to its + * child node. + * + * If Result supported qual checking, we'd have to punt on seeing a + * qual. Note that having a resconstantqual is not a showstopper: if + * that condition succeeds it affects nothing, while if it fails, no + * rows will be demanded from the Result child anyway. + */ + if (outerPlanState(child_node)) + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, SubqueryScanState)) + { + /* + * We can also descend through SubqueryScan, but only if it has no + * qual (otherwise it might discard rows). + */ + SubqueryScanState *subqueryState = (SubqueryScanState *) child_node; + + if (subqueryState->ss.ps.qual == NULL) + ExecSetTupleBound(tuples_needed, subqueryState->subplan); + } + else if (IsA(child_node, GatherState)) + { + /* + * A Gather node can propagate the bound to its workers. As with + * MergeAppend, no one worker could possibly need to return more + * tuples than the Gather itself needs to. + * + * Note: As with Sort, the Gather node is responsible for reacting + * properly to changes to this parameter. + */ + GatherState *gstate = (GatherState *) child_node; + + gstate->tuples_needed = tuples_needed; + + /* Also pass down the bound to our own copy of the child plan */ + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, GatherMergeState)) + { + /* Same comments as for Gather */ + GatherMergeState *gstate = (GatherMergeState *) child_node; + + gstate->tuples_needed = tuples_needed; + + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + + /* + * In principle we could descend through any plan node type that is + * certain not to discard or combine input rows; but on seeing a node that + * can do that, we can't propagate the bound any further. For the moment + * it's unclear that any other cases are worth checking here. + */ +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index e8d94ee6f3..a0f5a60d93 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.state = estate; gatherstate->ps.ExecProcNode = ExecGather; gatherstate->need_to_scan_locally = !node->single_copy; + gatherstate->tuples_needed = -1; /* * Miscellaneous initialization @@ -156,7 +157,8 @@ ExecGather(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gather->num_workers); + gather->num_workers, + node->tuples_needed); /* * Register backend workers. We might not get as many as we diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 64c62398bb..2526c584fd 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) gm_state->ps.plan = (Plan *) node; gm_state->ps.state = estate; gm_state->ps.ExecProcNode = ExecGatherMerge; + gm_state->tuples_needed = -1; /* * Miscellaneous initialization @@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gm->num_workers); + gm->num_workers, + node->tuples_needed); /* Try to launch workers. */ pcxt = node->pei->pcxt; diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c index ceb6854b59..883f46ce7c 100644 --- a/src/backend/executor/nodeLimit.c +++ b/src/backend/executor/nodeLimit.c @@ -27,7 +27,7 @@ #include "nodes/nodeFuncs.h" static void recompute_limits(LimitState *node); -static void pass_down_bound(LimitState *node, PlanState *child_node); +static int64 compute_tuples_needed(LimitState *node); /* ---------------------------------------------------------------- @@ -297,92 +297,26 @@ recompute_limits(LimitState *node) /* Set state-machine state */ node->lstate = LIMIT_RESCAN; - /* Notify child node about limit, if useful */ - pass_down_bound(node, outerPlanState(node)); + /* + * Notify child node about limit. Note: think not to "optimize" by + * skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We + * must update the child node anyway, in case this is a rescan and the + * previous time we got a different result. + */ + ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node)); } /* - * If we have a COUNT, and our input is a Sort node, notify it that it can - * use bounded sort. We can also pass down the bound through plan nodes - * that cannot remove or combine input rows; for example, if our input is a - * MergeAppend, we can apply the same bound to any Sorts that are direct - * children of the MergeAppend, since the MergeAppend surely need not read - * more than that many tuples from any one input. - * - * This is a bit of a kluge, but we don't have any more-abstract way of - * communicating between the two nodes; and it doesn't seem worth trying - * to invent one without some more examples of special communication needs. - * - * Note: it is the responsibility of nodeSort.c to react properly to - * changes of these parameters. If we ever do redesign this, it'd be a - * good idea to integrate this signaling with the parameter-change mechanism. + * Compute the maximum number of tuples needed to satisfy this Limit node. + * Return a negative value if there is not a determinable limit. */ -static void -pass_down_bound(LimitState *node, PlanState *child_node) +static int64 +compute_tuples_needed(LimitState *node) { - /* - * Since this function recurses, in principle we should check stack depth - * here. In practice, it's probably pointless since the earlier node - * initialization tree traversal would surely have consumed more stack. - */ - - if (IsA(child_node, SortState)) - { - SortState *sortState = (SortState *) child_node; - int64 tuples_needed = node->count + node->offset; - - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) - { - /* make sure flag gets reset if needed upon rescan */ - sortState->bounded = false; - } - else - { - sortState->bounded = true; - sortState->bound = tuples_needed; - } - } - else if (IsA(child_node, MergeAppendState)) - { - /* Pass down the bound through MergeAppend */ - MergeAppendState *maState = (MergeAppendState *) child_node; - int i; - - for (i = 0; i < maState->ms_nplans; i++) - pass_down_bound(node, maState->mergeplans[i]); - } - else if (IsA(child_node, ResultState)) - { - /* - * We also have to be prepared to look through a Result, since the - * planner might stick one atop MergeAppend for projection purposes. - * - * If Result supported qual checking, we'd have to punt on seeing a - * qual. Note that having a resconstantqual is not a showstopper: if - * that fails we're not getting any rows at all. - */ - if (outerPlanState(child_node)) - pass_down_bound(node, outerPlanState(child_node)); - } - else if (IsA(child_node, SubqueryScanState)) - { - /* - * We can also look through SubqueryScan, but only if it has no qual - * (otherwise it might discard rows). - */ - SubqueryScanState *subqueryState = (SubqueryScanState *) child_node; - - if (subqueryState->ss.ps.qual == NULL) - pass_down_bound(node, subqueryState->subplan); - } - - /* - * In principle we could look through any plan node type that is certain - * not to discard or combine input rows. In practice, there are not many - * node types that the planner might put between Sort and Limit, so trying - * to be very general is not worth the trouble. - */ + if (node->noCount) + return -1; + /* Note: if this overflows, we'll return a negative value, which is OK */ + return node->count + node->offset; } /* ---------------------------------------------------------------- diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index bd0a87fa04..79b886706f 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers); + EState *estate, int nworkers, int64 tuples_needed); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index eacbea3c36..f48a603dae 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node); +extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node); /* ---------------------------------------------------------------- diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3272c4b315..15a84269ec 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1919,6 +1919,7 @@ typedef struct GatherState struct TupleQueueReader **reader; TupleTableSlot *funnel_slot; bool need_to_scan_locally; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ } GatherState; /* ---------------- @@ -1944,6 +1945,7 @@ typedef struct GatherMergeState struct binaryheap *gm_heap; /* binary heap of slot indices */ bool gm_initialized; /* gather merge initilized ? */ bool need_to_scan_locally; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ int gm_nkeys; SortSupport gm_sortkeys; /* array of length ms_nkeys */ struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */ diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 084f0f0c8e..ccad18e978 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty; 500 (20 rows) +reset enable_hashagg; +-- gather merge test with a LIMIT +explain (costs off) + select fivethous from tenk1 order by fivethous limit 4; + QUERY PLAN +---------------------------------------------- + Limit + -> Gather Merge + Workers Planned: 4 + -> Sort + Sort Key: fivethous + -> Parallel Seq Scan on tenk1 +(6 rows) + +select fivethous from tenk1 order by fivethous limit 4; + fivethous +----------- + 0 + 0 + 1 + 1 +(4 rows) + -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) @@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5; (5 rows) reset max_parallel_workers; -reset enable_hashagg; SAVEPOINT settings; SET LOCAL force_parallel_mode = 1; explain (costs off) diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 58c3f59890..c0debddbcd 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -118,13 +118,20 @@ explain (costs off) select count(*) from tenk1 group by twenty; +reset enable_hashagg; + +-- gather merge test with a LIMIT +explain (costs off) + select fivethous from tenk1 order by fivethous limit 4; + +select fivethous from tenk1 order by fivethous limit 4; + -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) select string4 from tenk1 order by string4 limit 5; select string4 from tenk1 order by string4 limit 5; reset max_parallel_workers; -reset enable_hashagg; SAVEPOINT settings; SET LOCAL force_parallel_mode = 1;