From e5253fdc4f5fe2f38aec47e08c6aee93f934183d Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 15 Nov 2017 08:17:29 -0500 Subject: [PATCH] Add parallel_leader_participation GUC. Sometimes, for testing, it's useful to have the leader do nothing but read tuples from workers; and it's possible that could work out better even in production. Thomas Munro, reviewed by Amit Kapila and by me. A few final tweaks by me. Discussion: http://postgr.es/m/CAEepm=2U++Lp3bNTv2Bv_kkr5NE2pOyHhxU=G0YTa4ZhSYhHiw@mail.gmail.com --- doc/src/sgml/config.sgml | 26 ++++ src/backend/executor/nodeGather.c | 8 +- src/backend/executor/nodeGatherMerge.c | 6 +- src/backend/optimizer/path/costsize.c | 12 +- src/backend/optimizer/plan/planner.c | 1 + src/backend/utils/misc/guc.c | 10 ++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/optimizer/planmain.h | 1 + src/test/regress/expected/select_parallel.out | 113 ++++++++++++++++++ src/test/regress/sql/select_parallel.sql | 36 ++++++ 10 files changed, 205 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 996e82534a..fc1752fb3f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4265,6 +4265,32 @@ SELECT * FROM parent WHERE key = 2400; + + + parallel_leader_participation (boolean) + + + parallel_leader_participation configuration + parameter + + + + + + Allows the leader process to execute the query plan under + Gather and Gather Merge nodes + instead of waiting for worker processes. The default is + on. Setting this value to off + reduces the likelihood that workers will become blocked because the + leader is not reading tuples fast enough, but requires the leader + process to wait for worker processes to start up before the first + tuples can be produced. The degree to which the leader can help or + hinder performance depends on the plan type, number of workers and + query duration. + + + + force_parallel_mode (enum) diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 639f4f5af8..0298c65d06 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -38,6 +38,7 @@ #include "executor/nodeSubplan.h" #include "executor/tqueue.h" #include "miscadmin.h" +#include "optimizer/planmain.h" #include "pgstat.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -73,7 +74,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.ExecProcNode = ExecGather; gatherstate->initialized = false; - gatherstate->need_to_scan_locally = !node->single_copy; + gatherstate->need_to_scan_locally = + !node->single_copy && parallel_leader_participation; gatherstate->tuples_needed = -1; /* @@ -193,9 +195,9 @@ ExecGather(PlanState *pstate) node->nextreader = 0; } - /* Run plan locally if no workers or not single-copy. */ + /* Run plan locally if no workers or enabled and not single-copy. */ node->need_to_scan_locally = (node->nreaders == 0) - || !gather->single_copy; + || (!gather->single_copy && parallel_leader_participation); node->initialized = true; } diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 5625b12521..7206ab9197 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -23,6 +23,7 @@ #include "executor/tqueue.h" #include "lib/binaryheap.h" #include "miscadmin.h" +#include "optimizer/planmain.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -233,8 +234,9 @@ ExecGatherMerge(PlanState *pstate) } } - /* always allow leader to participate */ - node->need_to_scan_locally = true; + /* allow leader to participate if enabled or no choice */ + if (parallel_leader_participation || node->nreaders == 0) + node->need_to_scan_locally = true; node->initialized = true; } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 2d2df60886..d11bf19e30 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -5137,7 +5137,6 @@ static double get_parallel_divisor(Path *path) { double parallel_divisor = path->parallel_workers; - double leader_contribution; /* * Early experience with parallel query suggests that when there is only @@ -5150,9 +5149,14 @@ get_parallel_divisor(Path *path) * its time servicing each worker, and the remainder executing the * parallel plan. */ - leader_contribution = 1.0 - (0.3 * path->parallel_workers); - if (leader_contribution > 0) - parallel_divisor += leader_contribution; + if (parallel_leader_participation) + { + double leader_contribution; + + leader_contribution = 1.0 - (0.3 * path->parallel_workers); + if (leader_contribution > 0) + parallel_divisor += leader_contribution; + } return parallel_divisor; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 90fd9cc959..4c00a1453b 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -61,6 +61,7 @@ /* GUC parameters */ double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION; int force_parallel_mode = FORCE_PARALLEL_OFF; +bool parallel_leader_participation = true; /* Hook for plugins to get control in planner() */ planner_hook_type planner_hook = NULL; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c4c1afa084..6dcd738be6 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1676,6 +1676,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"parallel_leader_participation", PGC_USERSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Controls whether Gather and Gather Merge also run subplans."), + gettext_noop("Should gather nodes also run subplans, or just gather tuples?") + }, + ¶llel_leader_participation, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 368b280c8a..c7cd72ade2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -163,6 +163,7 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 # (change requires restart) #max_parallel_workers_per_gather = 2 # taken from max_parallel_workers +#parallel_leader_particulation = on #max_parallel_workers = 8 # maximum number of max_worker_processes that # can be used in parallel queries #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index f1d16cffab..d6133228bd 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -29,6 +29,7 @@ typedef enum #define DEFAULT_CURSOR_TUPLE_FRACTION 0.1 extern double cursor_tuple_fraction; extern int force_parallel_mode; +extern bool parallel_leader_participation; /* query_planner callback to compute query_pathkeys */ typedef void (*query_pathkeys_callback) (PlannerInfo *root, void *extra); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 63ed6a33c1..06aeddd805 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -34,6 +34,49 @@ select count(*) from a_star; 50 (1 row) +-- test with leader participation disabled +set parallel_leader_participation = off; +explain (costs off) + select count(*) from tenk1 where stringu1 = 'GRAAAA'; + QUERY PLAN +--------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Seq Scan on tenk1 + Filter: (stringu1 = 'GRAAAA'::name) +(6 rows) + +select count(*) from tenk1 where stringu1 = 'GRAAAA'; + count +------- + 15 +(1 row) + +-- test with leader participation disabled, but no workers available (so +-- the leader will have to run the plan despite the setting) +set max_parallel_workers = 0; +explain (costs off) + select count(*) from tenk1 where stringu1 = 'GRAAAA'; + QUERY PLAN +--------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Seq Scan on tenk1 + Filter: (stringu1 = 'GRAAAA'::name) +(6 rows) + +select count(*) from tenk1 where stringu1 = 'GRAAAA'; + count +------- + 15 +(1 row) + +reset max_parallel_workers; +reset parallel_leader_participation; -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4); explain (verbose, costs off) @@ -400,6 +443,49 @@ explain (costs off, verbose) (11 rows) drop function simple_func(integer); +-- test gather merge with parallel leader participation disabled +set parallel_leader_participation = off; +explain (costs off) + select count(*) from tenk1 group by twenty; + QUERY PLAN +---------------------------------------------------- + Finalize GroupAggregate + Group Key: twenty + -> Gather Merge + Workers Planned: 4 + -> Partial GroupAggregate + Group Key: twenty + -> Sort + Sort Key: twenty + -> Parallel Seq Scan on tenk1 +(9 rows) + +select count(*) from tenk1 group by twenty; + count +------- + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 + 500 +(20 rows) + +reset parallel_leader_participation; --test rescan behavior of gather merge set enable_material = false; explain (costs off) @@ -508,6 +594,33 @@ select string4 from tenk1 order by string4 limit 5; AAAAxx (5 rows) +-- gather merge test with 0 workers, with parallel leader +-- participation disabled (the leader will have to run the plan +-- despite the setting) +set parallel_leader_participation = off; +explain (costs off) + select string4 from tenk1 order by string4 limit 5; + QUERY PLAN +---------------------------------------------- + Limit + -> Gather Merge + Workers Planned: 4 + -> Sort + Sort Key: string4 + -> Parallel Seq Scan on tenk1 +(6 rows) + +select string4 from tenk1 order by string4 limit 5; + string4 +--------- + AAAAxx + AAAAxx + AAAAxx + AAAAxx + AAAAxx +(5 rows) + +reset parallel_leader_participation; reset max_parallel_workers; SAVEPOINT settings; SET LOCAL force_parallel_mode = 1; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 1bd2821083..b701b35408 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -19,6 +19,22 @@ explain (costs off) select count(*) from a_star; select count(*) from a_star; +-- test with leader participation disabled +set parallel_leader_participation = off; +explain (costs off) + select count(*) from tenk1 where stringu1 = 'GRAAAA'; +select count(*) from tenk1 where stringu1 = 'GRAAAA'; + +-- test with leader participation disabled, but no workers available (so +-- the leader will have to run the plan despite the setting) +set max_parallel_workers = 0; +explain (costs off) + select count(*) from tenk1 where stringu1 = 'GRAAAA'; +select count(*) from tenk1 where stringu1 = 'GRAAAA'; + +reset max_parallel_workers; +reset parallel_leader_participation; + -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4); explain (verbose, costs off) @@ -157,6 +173,16 @@ explain (costs off, verbose) drop function simple_func(integer); +-- test gather merge with parallel leader participation disabled +set parallel_leader_participation = off; + +explain (costs off) + select count(*) from tenk1 group by twenty; + +select count(*) from tenk1 group by twenty; + +reset parallel_leader_participation; + --test rescan behavior of gather merge set enable_material = false; @@ -192,6 +218,16 @@ set max_parallel_workers = 0; explain (costs off) select string4 from tenk1 order by string4 limit 5; select string4 from tenk1 order by string4 limit 5; + +-- gather merge test with 0 workers, with parallel leader +-- participation disabled (the leader will have to run the plan +-- despite the setting) +set parallel_leader_participation = off; +explain (costs off) + select string4 from tenk1 order by string4 limit 5; +select string4 from tenk1 order by string4 limit 5; + +reset parallel_leader_participation; reset max_parallel_workers; SAVEPOINT settings;