Allow parallel DISTINCT

We've supported parallel aggregation since e06a38965.  At the time, we
didn't quite get around to also adding parallel DISTINCT. So, let's do
that now.

This is implemented by introducing a two-phase DISTINCT.  Phase 1 is
performed on parallel workers, rows are made distinct there either by
hashing or by sort/unique.  The results from the parallel workers are
combined and the final distinct phase is performed serially to get rid of
any duplicate rows that appear due to combining rows for each of the
parallel workers.

Author: David Rowley
Reviewed-by: Zhihong Yu
Discussion: https://postgr.es/m/CAApHDvrjRxVKwQN0he79xS+9wyotFXL=RmoWqGGO2N45Farpgw@mail.gmail.com
This commit is contained in:
David Rowley 2021-08-22 23:31:16 +12:00
parent 26ae660903
commit 22c4e88ebf
5 changed files with 292 additions and 33 deletions

View File

@ -1015,6 +1015,7 @@ UPPERREL_SETOP result of UNION/INTERSECT/EXCEPT, if any
UPPERREL_PARTIAL_GROUP_AGG result of partial grouping/aggregation, if any
UPPERREL_GROUP_AGG result of grouping/aggregation, if any
UPPERREL_WINDOW result of window functions, if any
UPPERREL_PARTIAL_DISTINCT result of partial "SELECT DISTINCT", if any
UPPERREL_DISTINCT result of "SELECT DISTINCT", if any
UPPERREL_ORDERED result of ORDER BY, if any
UPPERREL_FINAL result of any remaining top-level actions

View File

@ -189,6 +189,12 @@ static void create_one_window_path(PlannerInfo *root,
List *activeWindows);
static RelOptInfo *create_distinct_paths(PlannerInfo *root,
RelOptInfo *input_rel);
static void create_partial_distinct_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *final_distinct_rel);
static RelOptInfo *create_final_distinct_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *distinct_rel);
static RelOptInfo *create_ordered_paths(PlannerInfo *root,
RelOptInfo *input_rel,
PathTarget *target,
@ -1570,6 +1576,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
*/
root->upper_targets[UPPERREL_FINAL] = final_target;
root->upper_targets[UPPERREL_ORDERED] = final_target;
root->upper_targets[UPPERREL_PARTIAL_DISTINCT] = sort_input_target;
root->upper_targets[UPPERREL_DISTINCT] = sort_input_target;
root->upper_targets[UPPERREL_WINDOW] = sort_input_target;
root->upper_targets[UPPERREL_GROUP_AGG] = grouping_target;
@ -4227,16 +4234,9 @@ create_one_window_path(PlannerInfo *root,
* Sort/Unique won't project anything.
*/
static RelOptInfo *
create_distinct_paths(PlannerInfo *root,
RelOptInfo *input_rel)
create_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel)
{
Query *parse = root->parse;
Path *cheapest_input_path = input_rel->cheapest_total_path;
RelOptInfo *distinct_rel;
double numDistinctRows;
bool allow_hash;
Path *path;
ListCell *lc;
/* For now, do all work in the (DISTINCT, NULL) upperrel */
distinct_rel = fetch_upper_rel(root, UPPERREL_DISTINCT, NULL);
@ -4258,6 +4258,184 @@ create_distinct_paths(PlannerInfo *root,
distinct_rel->useridiscurrent = input_rel->useridiscurrent;
distinct_rel->fdwroutine = input_rel->fdwroutine;
/* build distinct paths based on input_rel's pathlist */
create_final_distinct_paths(root, input_rel, distinct_rel);
/* now build distinct paths based on input_rel's partial_pathlist */
create_partial_distinct_paths(root, input_rel, distinct_rel);
/* Give a helpful error if we failed to create any paths */
if (distinct_rel->pathlist == NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("could not implement DISTINCT"),
errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
*/
if (distinct_rel->fdwroutine &&
distinct_rel->fdwroutine->GetForeignUpperPaths)
distinct_rel->fdwroutine->GetForeignUpperPaths(root,
UPPERREL_DISTINCT,
input_rel,
distinct_rel,
NULL);
/* Let extensions possibly add some more paths */
if (create_upper_paths_hook)
(*create_upper_paths_hook) (root, UPPERREL_DISTINCT, input_rel,
distinct_rel, NULL);
/* Now choose the best path(s) */
set_cheapest(distinct_rel);
return distinct_rel;
}
/*
* create_partial_distinct_paths
*
* Process 'input_rel' partial paths and add unique/aggregate paths to the
* UPPERREL_PARTIAL_DISTINCT rel. For paths created, add Gather/GatherMerge
* paths on top and add a final unique/aggregate path to remove any duplicate
* produced from combining rows from parallel workers.
*/
static void
create_partial_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *final_distinct_rel)
{
RelOptInfo *partial_distinct_rel;
Query *parse;
List *distinctExprs;
double numDistinctRows;
Path *cheapest_partial_path;
ListCell *lc;
/* nothing to do when there are no partial paths in the input rel */
if (!input_rel->consider_parallel || input_rel->partial_pathlist == NIL)
return;
parse = root->parse;
/* can't do parallel DISTINCT ON */
if (parse->hasDistinctOn)
return;
partial_distinct_rel = fetch_upper_rel(root, UPPERREL_PARTIAL_DISTINCT,
NULL);
partial_distinct_rel->reltarget = root->upper_targets[UPPERREL_PARTIAL_DISTINCT];
partial_distinct_rel->consider_parallel = input_rel->consider_parallel;
/*
* If input_rel belongs to a single FDW, so does the partial_distinct_rel.
*/
partial_distinct_rel->serverid = input_rel->serverid;
partial_distinct_rel->userid = input_rel->userid;
partial_distinct_rel->useridiscurrent = input_rel->useridiscurrent;
partial_distinct_rel->fdwroutine = input_rel->fdwroutine;
cheapest_partial_path = linitial(input_rel->partial_pathlist);
distinctExprs = get_sortgrouplist_exprs(parse->distinctClause,
parse->targetList);
/* estimate how many distinct rows we'll get from each worker */
numDistinctRows = estimate_num_groups(root, distinctExprs,
cheapest_partial_path->rows,
NULL, NULL);
/* first try adding unique paths atop of sorted paths */
if (grouping_is_sortable(parse->distinctClause))
{
foreach(lc, input_rel->partial_pathlist)
{
Path *path = (Path *) lfirst(lc);
if (pathkeys_contained_in(root->distinct_pathkeys, path->pathkeys))
{
add_partial_path(partial_distinct_rel, (Path *)
create_upper_unique_path(root,
partial_distinct_rel,
path,
list_length(root->distinct_pathkeys),
numDistinctRows));
}
}
}
/*
* Now try hash aggregate paths, if enabled and hashing is possible. Since
* we're not on the hook to ensure we do our best to create at least one
* path here, we treat enable_hashagg as a hard off-switch rather than the
* slightly softer variant in create_final_distinct_paths.
*/
if (enable_hashagg && grouping_is_hashable(parse->distinctClause))
{
add_partial_path(partial_distinct_rel, (Path *)
create_agg_path(root,
partial_distinct_rel,
cheapest_partial_path,
cheapest_partial_path->pathtarget,
AGG_HASHED,
AGGSPLIT_SIMPLE,
parse->distinctClause,
NIL,
NULL,
numDistinctRows));
}
/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
*/
if (partial_distinct_rel->fdwroutine &&
partial_distinct_rel->fdwroutine->GetForeignUpperPaths)
partial_distinct_rel->fdwroutine->GetForeignUpperPaths(root,
UPPERREL_PARTIAL_DISTINCT,
input_rel,
partial_distinct_rel,
NULL);
/* Let extensions possibly add some more partial paths */
if (create_upper_paths_hook)
(*create_upper_paths_hook) (root, UPPERREL_PARTIAL_DISTINCT,
input_rel, partial_distinct_rel, NULL);
if (partial_distinct_rel->partial_pathlist != NIL)
{
generate_gather_paths(root, partial_distinct_rel, true);
set_cheapest(partial_distinct_rel);
/*
* Finally, create paths to distinctify the final result. This step
* is needed to remove any duplicates due to combining rows from
* parallel workers.
*/
create_final_distinct_paths(root, partial_distinct_rel,
final_distinct_rel);
}
}
/*
* create_final_distinct_paths
* Create distinct paths in 'distinct_rel' based on 'input_rel' pathlist
*
* input_rel: contains the source-data paths
* distinct_rel: destination relation for storing created paths
*/
static RelOptInfo *
create_final_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *distinct_rel)
{
Query *parse = root->parse;
Path *cheapest_input_path = input_rel->cheapest_total_path;
double numDistinctRows;
bool allow_hash;
Path *path;
ListCell *lc;
/* Estimate number of distinct rows there will be */
if (parse->groupClause || parse->groupingSets || parse->hasAggs ||
root->hasHavingQual)
@ -4384,31 +4562,6 @@ create_distinct_paths(PlannerInfo *root,
numDistinctRows));
}
/* Give a helpful error if we failed to find any implementation */
if (distinct_rel->pathlist == NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("could not implement DISTINCT"),
errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
*/
if (distinct_rel->fdwroutine &&
distinct_rel->fdwroutine->GetForeignUpperPaths)
distinct_rel->fdwroutine->GetForeignUpperPaths(root, UPPERREL_DISTINCT,
input_rel, distinct_rel,
NULL);
/* Let extensions possibly add some more paths */
if (create_upper_paths_hook)
(*create_upper_paths_hook) (root, UPPERREL_DISTINCT,
input_rel, distinct_rel, NULL);
/* Now choose the best path(s) */
set_cheapest(distinct_rel);
return distinct_rel;
}

View File

@ -71,6 +71,7 @@ typedef enum UpperRelationKind
* any */
UPPERREL_GROUP_AGG, /* result of grouping/aggregation, if any */
UPPERREL_WINDOW, /* result of window functions, if any */
UPPERREL_PARTIAL_DISTINCT, /* result of partial "SELECT DISTINCT", if any */
UPPERREL_DISTINCT, /* result of "SELECT DISTINCT", if any */
UPPERREL_ORDERED, /* result of ORDER BY, if any */
UPPERREL_FINAL /* result of any remaining top-level actions */

View File

@ -210,6 +210,73 @@ DROP TABLE distinct_hash_1;
DROP TABLE distinct_hash_2;
DROP TABLE distinct_group_1;
DROP TABLE distinct_group_2;
-- Test parallel DISTINCT
SET parallel_tuple_cost=0;
SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;
-- Ensure we get a parallel plan
EXPLAIN (costs off)
SELECT DISTINCT four FROM tenk1;
QUERY PLAN
----------------------------------------------------
Unique
-> Sort
Sort Key: four
-> Gather
Workers Planned: 2
-> HashAggregate
Group Key: four
-> Parallel Seq Scan on tenk1
(8 rows)
-- Ensure the parallel plan produces the correct results
SELECT DISTINCT four FROM tenk1;
four
------
0
1
2
3
(4 rows)
CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
BEGIN
RETURN a;
END;
$$ LANGUAGE plpgsql PARALLEL UNSAFE;
-- Ensure we don't do parallel distinct with a parallel unsafe function
EXPLAIN (COSTS OFF)
SELECT DISTINCT distinct_func(1) FROM tenk1;
QUERY PLAN
----------------------------------------------------------
Unique
-> Sort
Sort Key: (distinct_func(1))
-> Index Only Scan using tenk1_hundred on tenk1
(4 rows)
-- make the function parallel safe
CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
BEGIN
RETURN a;
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;
-- Ensure we do parallel distinct now that the function is parallel safe
EXPLAIN (COSTS OFF)
SELECT DISTINCT distinct_func(1) FROM tenk1;
QUERY PLAN
----------------------------------------------
Unique
-> Sort
Sort Key: (distinct_func(1))
-> Gather
Workers Planned: 2
-> Parallel Seq Scan on tenk1
(6 rows)
RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
--
-- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
-- very own regression file.

View File

@ -107,6 +107,43 @@ DROP TABLE distinct_hash_2;
DROP TABLE distinct_group_1;
DROP TABLE distinct_group_2;
-- Test parallel DISTINCT
SET parallel_tuple_cost=0;
SET parallel_setup_cost=0;
SET min_parallel_table_scan_size=0;
-- Ensure we get a parallel plan
EXPLAIN (costs off)
SELECT DISTINCT four FROM tenk1;
-- Ensure the parallel plan produces the correct results
SELECT DISTINCT four FROM tenk1;
CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
BEGIN
RETURN a;
END;
$$ LANGUAGE plpgsql PARALLEL UNSAFE;
-- Ensure we don't do parallel distinct with a parallel unsafe function
EXPLAIN (COSTS OFF)
SELECT DISTINCT distinct_func(1) FROM tenk1;
-- make the function parallel safe
CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
BEGIN
RETURN a;
END;
$$ LANGUAGE plpgsql PARALLEL SAFE;
-- Ensure we do parallel distinct now that the function is parallel safe
EXPLAIN (COSTS OFF)
SELECT DISTINCT distinct_func(1) FROM tenk1;
RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
--
-- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
-- very own regression file.