From 3452dc5240da43e833118484e1e9b4894d04431c Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 29 Aug 2017 13:12:23 -0400 Subject: [PATCH] Push tuple limits through Gather and Gather Merge. If we only need, say, 10 tuples in total, then we certainly don't need more than 10 tuples from any single process. Pushing down the limit lets workers exit early when possible. For Gather Merge, there is an additional benefit: a Sort immediately below the Gather Merge can be done as a bounded sort if there is an applicable limit. Robert Haas and Tom Lane Discussion: http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com --- src/backend/executor/execParallel.c | 54 ++++++-- src/backend/executor/execProcnode.c | 121 ++++++++++++++++++ src/backend/executor/nodeGather.c | 4 +- src/backend/executor/nodeGatherMerge.c | 4 +- src/backend/executor/nodeLimit.c | 98 +++----------- src/include/executor/execParallel.h | 2 +- src/include/executor/executor.h | 1 + src/include/nodes/execnodes.h | 2 + src/test/regress/expected/select_parallel.out | 24 +++- src/test/regress/sql/select_parallel.sql | 9 +- 10 files changed, 222 insertions(+), 97 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8..ad9eba63dd 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,16 +47,25 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 +/* + * Fixed-size random stuff that we need to pass to parallel workers. + */ +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ +} FixedParallelExecutorState; + /* * DSM structure for accumulating per-PlanState instrumentation. * @@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); - /* Run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + + /* + * Run the plan. If we specified a tuple bound, be careful not to demand + * more tuples than that. + */ + ExecutorRun(queryDesc, + ForwardScanDirection, + fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed, + true); /* Shut down the executor */ ExecutorFinish(queryDesc); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 36d2914249..c1aa5064c9 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node) return false; } + +/* + * ExecSetTupleBound + * + * Set a tuple bound for a planstate node. This lets child plan nodes + * optimize based on the knowledge that the maximum number of tuples that + * their parent will demand is limited. The tuple bound for a node may + * only be changed between scans (i.e., after node initialization or just + * before an ExecReScan call). + * + * Any negative tuples_needed value means "no limit", which should be the + * default assumption when this is not called at all for a particular node. + * + * Note: if this is called repeatedly on a plan tree, the exact same set + * of nodes must be updated with the new limit each time; be careful that + * only unchanging conditions are tested here. + */ +void +ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) +{ + /* + * Since this function recurses, in principle we should check stack depth + * here. In practice, it's probably pointless since the earlier node + * initialization tree traversal would surely have consumed more stack. + */ + + if (IsA(child_node, SortState)) + { + /* + * If it is a Sort node, notify it that it can use bounded sort. + * + * Note: it is the responsibility of nodeSort.c to react properly to + * changes of these parameters. If we ever redesign this, it'd be a + * good idea to integrate this signaling with the parameter-change + * mechanism. + */ + SortState *sortState = (SortState *) child_node; + + if (tuples_needed < 0) + { + /* make sure flag gets reset if needed upon rescan */ + sortState->bounded = false; + } + else + { + sortState->bounded = true; + sortState->bound = tuples_needed; + } + } + else if (IsA(child_node, MergeAppendState)) + { + /* + * If it is a MergeAppend, we can apply the bound to any nodes that + * are children of the MergeAppend, since the MergeAppend surely need + * read no more than that many tuples from any one input. + */ + MergeAppendState *maState = (MergeAppendState *) child_node; + int i; + + for (i = 0; i < maState->ms_nplans; i++) + ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + } + else if (IsA(child_node, ResultState)) + { + /* + * Similarly, for a projecting Result, we can apply the bound to its + * child node. + * + * If Result supported qual checking, we'd have to punt on seeing a + * qual. Note that having a resconstantqual is not a showstopper: if + * that condition succeeds it affects nothing, while if it fails, no + * rows will be demanded from the Result child anyway. + */ + if (outerPlanState(child_node)) + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, SubqueryScanState)) + { + /* + * We can also descend through SubqueryScan, but only if it has no + * qual (otherwise it might discard rows). + */ + SubqueryScanState *subqueryState = (SubqueryScanState *) child_node; + + if (subqueryState->ss.ps.qual == NULL) + ExecSetTupleBound(tuples_needed, subqueryState->subplan); + } + else if (IsA(child_node, GatherState)) + { + /* + * A Gather node can propagate the bound to its workers. As with + * MergeAppend, no one worker could possibly need to return more + * tuples than the Gather itself needs to. + * + * Note: As with Sort, the Gather node is responsible for reacting + * properly to changes to this parameter. + */ + GatherState *gstate = (GatherState *) child_node; + + gstate->tuples_needed = tuples_needed; + + /* Also pass down the bound to our own copy of the child plan */ + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + else if (IsA(child_node, GatherMergeState)) + { + /* Same comments as for Gather */ + GatherMergeState *gstate = (GatherMergeState *) child_node; + + gstate->tuples_needed = tuples_needed; + + ExecSetTupleBound(tuples_needed, outerPlanState(child_node)); + } + + /* + * In principle we could descend through any plan node type that is + * certain not to discard or combine input rows; but on seeing a node that + * can do that, we can't propagate the bound any further. For the moment + * it's unclear that any other cases are worth checking here. + */ +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index e8d94ee6f3..a0f5a60d93 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.state = estate; gatherstate->ps.ExecProcNode = ExecGather; gatherstate->need_to_scan_locally = !node->single_copy; + gatherstate->tuples_needed = -1; /* * Miscellaneous initialization @@ -156,7 +157,8 @@ ExecGather(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gather->num_workers); + gather->num_workers, + node->tuples_needed); /* * Register backend workers. We might not get as many as we diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 64c62398bb..2526c584fd 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) gm_state->ps.plan = (Plan *) node; gm_state->ps.state = estate; gm_state->ps.ExecProcNode = ExecGatherMerge; + gm_state->tuples_needed = -1; /* * Miscellaneous initialization @@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate) if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, - gm->num_workers); + gm->num_workers, + node->tuples_needed); /* Try to launch workers. */ pcxt = node->pei->pcxt; diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c index ceb6854b59..883f46ce7c 100644 --- a/src/backend/executor/nodeLimit.c +++ b/src/backend/executor/nodeLimit.c @@ -27,7 +27,7 @@ #include "nodes/nodeFuncs.h" static void recompute_limits(LimitState *node); -static void pass_down_bound(LimitState *node, PlanState *child_node); +static int64 compute_tuples_needed(LimitState *node); /* ---------------------------------------------------------------- @@ -297,92 +297,26 @@ recompute_limits(LimitState *node) /* Set state-machine state */ node->lstate = LIMIT_RESCAN; - /* Notify child node about limit, if useful */ - pass_down_bound(node, outerPlanState(node)); + /* + * Notify child node about limit. Note: think not to "optimize" by + * skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We + * must update the child node anyway, in case this is a rescan and the + * previous time we got a different result. + */ + ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node)); } /* - * If we have a COUNT, and our input is a Sort node, notify it that it can - * use bounded sort. We can also pass down the bound through plan nodes - * that cannot remove or combine input rows; for example, if our input is a - * MergeAppend, we can apply the same bound to any Sorts that are direct - * children of the MergeAppend, since the MergeAppend surely need not read - * more than that many tuples from any one input. - * - * This is a bit of a kluge, but we don't have any more-abstract way of - * communicating between the two nodes; and it doesn't seem worth trying - * to invent one without some more examples of special communication needs. - * - * Note: it is the responsibility of nodeSort.c to react properly to - * changes of these parameters. If we ever do redesign this, it'd be a - * good idea to integrate this signaling with the parameter-change mechanism. + * Compute the maximum number of tuples needed to satisfy this Limit node. + * Return a negative value if there is not a determinable limit. */ -static void -pass_down_bound(LimitState *node, PlanState *child_node) +static int64 +compute_tuples_needed(LimitState *node) { - /* - * Since this function recurses, in principle we should check stack depth - * here. In practice, it's probably pointless since the earlier node - * initialization tree traversal would surely have consumed more stack. - */ - - if (IsA(child_node, SortState)) - { - SortState *sortState = (SortState *) child_node; - int64 tuples_needed = node->count + node->offset; - - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) - { - /* make sure flag gets reset if needed upon rescan */ - sortState->bounded = false; - } - else - { - sortState->bounded = true; - sortState->bound = tuples_needed; - } - } - else if (IsA(child_node, MergeAppendState)) - { - /* Pass down the bound through MergeAppend */ - MergeAppendState *maState = (MergeAppendState *) child_node; - int i; - - for (i = 0; i < maState->ms_nplans; i++) - pass_down_bound(node, maState->mergeplans[i]); - } - else if (IsA(child_node, ResultState)) - { - /* - * We also have to be prepared to look through a Result, since the - * planner might stick one atop MergeAppend for projection purposes. - * - * If Result supported qual checking, we'd have to punt on seeing a - * qual. Note that having a resconstantqual is not a showstopper: if - * that fails we're not getting any rows at all. - */ - if (outerPlanState(child_node)) - pass_down_bound(node, outerPlanState(child_node)); - } - else if (IsA(child_node, SubqueryScanState)) - { - /* - * We can also look through SubqueryScan, but only if it has no qual - * (otherwise it might discard rows). - */ - SubqueryScanState *subqueryState = (SubqueryScanState *) child_node; - - if (subqueryState->ss.ps.qual == NULL) - pass_down_bound(node, subqueryState->subplan); - } - - /* - * In principle we could look through any plan node type that is certain - * not to discard or combine input rows. In practice, there are not many - * node types that the planner might put between Sort and Limit, so trying - * to be very general is not worth the trouble. - */ + if (node->noCount) + return -1; + /* Note: if this overflows, we'll return a negative value, which is OK */ + return node->count + node->offset; } /* ---------------------------------------------------------------- diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index bd0a87fa04..79b886706f 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, - EState *estate, int nworkers); + EState *estate, int nworkers, int64 tuples_needed); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index eacbea3c36..f48a603dae 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node); +extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node); /* ---------------------------------------------------------------- diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3272c4b315..15a84269ec 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1919,6 +1919,7 @@ typedef struct GatherState struct TupleQueueReader **reader; TupleTableSlot *funnel_slot; bool need_to_scan_locally; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ } GatherState; /* ---------------- @@ -1944,6 +1945,7 @@ typedef struct GatherMergeState struct binaryheap *gm_heap; /* binary heap of slot indices */ bool gm_initialized; /* gather merge initilized ? */ bool need_to_scan_locally; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ int gm_nkeys; SortSupport gm_sortkeys; /* array of length ms_nkeys */ struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */ diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 084f0f0c8e..ccad18e978 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty; 500 (20 rows) +reset enable_hashagg; +-- gather merge test with a LIMIT +explain (costs off) + select fivethous from tenk1 order by fivethous limit 4; + QUERY PLAN +---------------------------------------------- + Limit + -> Gather Merge + Workers Planned: 4 + -> Sort + Sort Key: fivethous + -> Parallel Seq Scan on tenk1 +(6 rows) + +select fivethous from tenk1 order by fivethous limit 4; + fivethous +----------- + 0 + 0 + 1 + 1 +(4 rows) + -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) @@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5; (5 rows) reset max_parallel_workers; -reset enable_hashagg; SAVEPOINT settings; SET LOCAL force_parallel_mode = 1; explain (costs off) diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 58c3f59890..c0debddbcd 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -118,13 +118,20 @@ explain (costs off) select count(*) from tenk1 group by twenty; +reset enable_hashagg; + +-- gather merge test with a LIMIT +explain (costs off) + select fivethous from tenk1 order by fivethous limit 4; + +select fivethous from tenk1 order by fivethous limit 4; + -- gather merge test with 0 worker set max_parallel_workers = 0; explain (costs off) select string4 from tenk1 order by string4 limit 5; select string4 from tenk1 order by string4 limit 5; reset max_parallel_workers; -reset enable_hashagg; SAVEPOINT settings; SET LOCAL force_parallel_mode = 1;