From 1f39bce021540fde00990af55b4432c55ef4b3c7 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Wed, 18 Mar 2020 15:42:02 -0700 Subject: [PATCH] Disk-based Hash Aggregation. While performing hash aggregation, track memory usage when adding new groups to a hash table. If the memory usage exceeds work_mem, enter "spill mode". In spill mode, new groups are not created in the hash table(s), but existing groups continue to be advanced if input tuples match. Tuples that would cause a new group to be created are instead spilled to a logical tape to be processed later. The tuples are spilled in a partitioned fashion. When all tuples from the outer plan are processed (either by advancing the group or spilling the tuple), finalize and emit the groups from the hash table. Then, create new batches of work from the spilled partitions, and select one of the saved batches and process it (possibly spilling recursively). Author: Jeff Davis Reviewed-by: Tomas Vondra, Adam Lee, Justin Pryzby, Taylor Vesely, Melanie Plageman Discussion: https://postgr.es/m/507ac540ec7c20136364b5272acbcd4574aa76ef.camel@j-davis.com --- doc/src/sgml/config.sgml | 32 + src/backend/commands/explain.c | 37 + src/backend/executor/nodeAgg.c | 1092 ++++++++++++++++- src/backend/optimizer/path/costsize.c | 70 +- src/backend/optimizer/plan/planner.c | 19 +- src/backend/optimizer/prep/prepunion.c | 2 +- src/backend/optimizer/util/pathnode.c | 14 +- src/backend/utils/misc/guc.c | 20 + src/include/executor/nodeAgg.h | 8 + src/include/nodes/execnodes.h | 22 +- src/include/optimizer/cost.h | 4 +- src/test/regress/expected/aggregates.out | 184 +++ src/test/regress/expected/groupingsets.out | 122 ++ src/test/regress/expected/select_distinct.out | 62 + src/test/regress/expected/sysviews.out | 4 +- src/test/regress/sql/aggregates.sql | 131 ++ src/test/regress/sql/groupingsets.sql | 103 ++ src/test/regress/sql/select_distinct.sql | 62 + 18 files changed, 1950 insertions(+), 38 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 672bf6f1ee..70854ae298 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4482,6 +4482,23 @@ ANY num_sync ( + enable_groupingsets_hash_disk (boolean) + + enable_groupingsets_hash_disk configuration parameter + + + + + Enables or disables the query planner's use of hashed aggregation plan + types for grouping sets when the total size of the hash tables is + expected to exceed work_mem. See . The default is + off. + + + + enable_hashagg (boolean) @@ -4496,6 +4513,21 @@ ANY num_sync ( + enable_hashagg_disk (boolean) + + enable_hashagg_disk configuration parameter + + + + + Enables or disables the query planner's use of hashed aggregation plan + types when the memory usage is expected to exceed + work_mem. The default is on. + + + + enable_hashjoin (boolean) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index d901dc4a50..58141d8393 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -104,6 +104,7 @@ static void show_tablesample(TableSampleClause *tsc, PlanState *planstate, List *ancestors, ExplainState *es); static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); +static void show_hashagg_info(AggState *hashstate, ExplainState *es); static void show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es); static void show_instrumentation_count(const char *qlabel, int which, @@ -1882,6 +1883,7 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Agg: show_agg_keys(castNode(AggState, planstate), ancestors, es); show_upper_qual(plan->qual, "Filter", planstate, ancestors, es); + show_hashagg_info((AggState *) planstate, es); if (plan->qual) show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); @@ -2769,6 +2771,41 @@ show_hash_info(HashState *hashstate, ExplainState *es) } } +/* + * Show information on hash aggregate memory usage and batches. + */ +static void +show_hashagg_info(AggState *aggstate, ExplainState *es) +{ + Agg *agg = (Agg *)aggstate->ss.ps.plan; + long memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024; + + Assert(IsA(aggstate, AggState)); + + if (agg->aggstrategy != AGG_HASHED && + agg->aggstrategy != AGG_MIXED) + return; + + if (es->costs && aggstate->hash_planned_partitions > 0) + { + ExplainPropertyInteger("Planned Partitions", NULL, + aggstate->hash_planned_partitions, es); + } + + if (!es->analyze) + return; + + /* EXPLAIN ANALYZE */ + ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es); + if (aggstate->hash_batches_used > 0) + { + ExplainPropertyInteger("Disk Usage", "kB", + aggstate->hash_disk_used, es); + ExplainPropertyInteger("HashAgg Batches", NULL, + aggstate->hash_batches_used, es); + } +} + /* * If it's EXPLAIN ANALYZE, show exact/lossy pages for a BitmapHeapScan node */ diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 7aebb247d8..44c159ab2a 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -194,6 +194,29 @@ * transition values. hashcontext is the single context created to support * all hash tables. * + * Spilling To Disk + * + * When performing hash aggregation, if the hash table memory exceeds the + * limit (see hash_agg_check_limits()), we enter "spill mode". In spill + * mode, we advance the transition states only for groups already in the + * hash table. For tuples that would need to create a new hash table + * entries (and initialize new transition states), we instead spill them to + * disk to be processed later. The tuples are spilled in a partitioned + * manner, so that subsequent batches are smaller and less likely to exceed + * work_mem (if a batch does exceed work_mem, it must be spilled + * recursively). + * + * Spilled data is written to logical tapes. These provide better control + * over memory usage, disk space, and the number of files than if we were + * to use a BufFile for each spill. + * + * Note that it's possible for transition states to start small but then + * grow very large; for instance in the case of ARRAY_AGG. In such cases, + * it's still possible to significantly exceed work_mem. We try to avoid + * this situation by estimating what will fit in the available memory, and + * imposing a limit on the number of groups separately from the amount of + * memory consumed. + * * Transition / Combine function invocation: * * For performance reasons transition functions, including combine @@ -233,12 +256,105 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" +#include "utils/dynahash.h" #include "utils/expandeddatum.h" +#include "utils/logtape.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/tuplesort.h" +/* + * Control how many partitions are created when spilling HashAgg to + * disk. + * + * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of + * partitions needed such that each partition will fit in memory. The factor + * is set higher than one because there's not a high cost to having a few too + * many partitions, and it makes it less likely that a partition will need to + * be spilled recursively. Another benefit of having more, smaller partitions + * is that small hash tables may perform better than large ones due to memory + * caching effects. + * + * We also specify a min and max number of partitions per spill. Too few might + * mean a lot of wasted I/O from repeated spilling of the same tuples. Too + * many will result in lots of memory wasted buffering the spill files (which + * could instead be spent on a larger hash table). + */ +#define HASHAGG_PARTITION_FACTOR 1.50 +#define HASHAGG_MIN_PARTITIONS 4 +#define HASHAGG_MAX_PARTITIONS 1024 + +/* + * For reading from tapes, the buffer size must be a multiple of + * BLCKSZ. Larger values help when reading from multiple tapes concurrently, + * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a + * tape always uses a buffer of size BLCKSZ. + */ +#define HASHAGG_READ_BUFFER_SIZE BLCKSZ +#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ + +/* minimum number of initial hash table buckets */ +#define HASHAGG_MIN_BUCKETS 256 + +/* + * Track all tapes needed for a HashAgg that spills. We don't know the maximum + * number of tapes needed at the start of the algorithm (because it can + * recurse), so one tape set is allocated and extended as needed for new + * tapes. When a particular tape is already read, rewind it for write mode and + * put it in the free list. + * + * Tapes' buffers can take up substantial memory when many tapes are open at + * once. We only need one tape open at a time in read mode (using a buffer + * that's a multiple of BLCKSZ); but we need one tape open in write mode (each + * requiring a buffer of size BLCKSZ) for each partition. + */ +typedef struct HashTapeInfo +{ + LogicalTapeSet *tapeset; + int ntapes; + int *freetapes; + int nfreetapes; + int freetapes_alloc; +} HashTapeInfo; + +/* + * Represents partitioned spill data for a single hashtable. Contains the + * necessary information to route tuples to the correct partition, and to + * transform the spilled data into new batches. + * + * The high bits are used for partition selection (when recursing, we ignore + * the bits that have already been used for partition selection at an earlier + * level). + */ +typedef struct HashAggSpill +{ + LogicalTapeSet *tapeset; /* borrowed reference to tape set */ + int npartitions; /* number of partitions */ + int *partitions; /* spill partition tape numbers */ + int64 *ntuples; /* number of tuples in each partition */ + uint32 mask; /* mask to find partition from hash value */ + int shift; /* after masking, shift by this amount */ +} HashAggSpill; + +/* + * Represents work to be done for one pass of hash aggregation (with only one + * grouping set). + * + * Also tracks the bits of the hash already used for partition selection by + * earlier iterations, so that this batch can use new bits. If all bits have + * already been used, no partitioning will be done (any spilled data will go + * to a single output tape). + */ +typedef struct HashAggBatch +{ + int setno; /* grouping set */ + int used_bits; /* number of bits of hash already used */ + LogicalTapeSet *tapeset; /* borrowed reference to tape set */ + int input_tapenum; /* input partition tape */ + int64 input_tuples; /* number of tuples in this batch */ +} HashAggBatch; + static void select_current_set(AggState *aggstate, int setno, bool is_hash); static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); @@ -275,11 +391,43 @@ static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_tables(AggState *aggstate); static void build_hash_table(AggState *aggstate, int setno, long nbuckets); +static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, + bool nullcheck); +static long hash_choose_num_buckets(double hashentrysize, + long estimated_nbuckets, + Size memory); +static int hash_choose_num_partitions(uint64 input_groups, + double hashentrysize, + int used_bits, + int *log2_npartittions); static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash); static void lookup_hash_entries(AggState *aggstate); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); +static bool agg_refill_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); +static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate); +static void hash_agg_check_limits(AggState *aggstate); +static void hash_agg_enter_spill_mode(AggState *aggstate); +static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, + int npartitions); +static void hashagg_finish_initial_spills(AggState *aggstate); +static void hashagg_reset_spill_state(AggState *aggstate); +static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, + int input_tapenum, int setno, + int64 input_tuples, int used_bits); +static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); +static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, + int used_bits, uint64 input_tuples, + double hashentrysize); +static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, + uint32 hash); +static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, + int setno); +static void hashagg_tapeinfo_init(AggState *aggstate); +static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, + int ndest); +static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -1287,14 +1435,27 @@ build_hash_tables(AggState *aggstate) for (setno = 0; setno < aggstate->num_hashes; ++setno) { AggStatePerHash perhash = &aggstate->perhash[setno]; + long nbuckets; + Size memory; + + if (perhash->hashtable != NULL) + { + ResetTupleHashTable(perhash->hashtable); + continue; + } Assert(perhash->aggnode->numGroups > 0); - if (perhash->hashtable) - ResetTupleHashTable(perhash->hashtable); - else - build_hash_table(aggstate, setno, perhash->aggnode->numGroups); + memory = aggstate->hash_mem_limit / aggstate->num_hashes; + + /* choose reasonable number of buckets per hashtable */ + nbuckets = hash_choose_num_buckets( + aggstate->hashentrysize, perhash->aggnode->numGroups, memory); + + build_hash_table(aggstate, setno, nbuckets); } + + aggstate->hash_ngroups_current = 0; } /* @@ -1304,7 +1465,7 @@ static void build_hash_table(AggState *aggstate, int setno, long nbuckets) { AggStatePerHash perhash = &aggstate->perhash[setno]; - MemoryContext metacxt = aggstate->ss.ps.state->es_query_cxt; + MemoryContext metacxt = aggstate->hash_metacxt; MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory; MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory; Size additionalsize; @@ -1487,6 +1648,320 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace) transitionSpace; } +/* + * hashagg_recompile_expressions() + * + * Identifies the right phase, compiles the right expression given the + * arguments, and then sets phase->evalfunc to that expression. + * + * Different versions of the compiled expression are needed depending on + * whether hash aggregation has spilled or not, and whether it's reading from + * the outer plan or a tape. Before spilling to disk, the expression reads + * from the outer plan and does not need to perform a NULL check. After + * HashAgg begins to spill, new groups will not be created in the hash table, + * and the AggStatePerGroup array may be NULL; therefore we need to add a null + * pointer check to the expression. Then, when reading spilled data from a + * tape, we change the outer slot type to be a fixed minimal tuple slot. + * + * It would be wasteful to recompile every time, so cache the compiled + * expressions in the AggStatePerPhase, and reuse when appropriate. + */ +static void +hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) +{ + AggStatePerPhase phase; + int i = minslot ? 1 : 0; + int j = nullcheck ? 1 : 0; + + Assert(aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_MIXED); + + if (aggstate->aggstrategy == AGG_HASHED) + phase = &aggstate->phases[0]; + else /* AGG_MIXED */ + phase = &aggstate->phases[1]; + + if (phase->evaltrans_cache[i][j] == NULL) + { + const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops; + bool outerfixed = aggstate->ss.ps.outeropsfixed; + bool dohash = true; + bool dosort; + + dosort = aggstate->aggstrategy == AGG_MIXED ? true : false; + + /* temporarily change the outerops while compiling the expression */ + if (minslot) + { + aggstate->ss.ps.outerops = &TTSOpsMinimalTuple; + aggstate->ss.ps.outeropsfixed = true; + } + + phase->evaltrans_cache[i][j] = ExecBuildAggTrans( + aggstate, phase, dosort, dohash, nullcheck); + + /* change back */ + aggstate->ss.ps.outerops = outerops; + aggstate->ss.ps.outeropsfixed = outerfixed; + } + + phase->evaltrans = phase->evaltrans_cache[i][j]; +} + +/* + * Set limits that trigger spilling to avoid exceeding work_mem. Consider the + * number of partitions we expect to create (if we do spill). + * + * There are two limits: a memory limit, and also an ngroups limit. The + * ngroups limit becomes important when we expect transition values to grow + * substantially larger than the initial value. + */ +void +hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, + Size *mem_limit, uint64 *ngroups_limit, + int *num_partitions) +{ + int npartitions; + Size partition_mem; + + /* if not expected to spill, use all of work_mem */ + if (input_groups * hashentrysize < work_mem * 1024L) + { + *mem_limit = work_mem * 1024L; + *ngroups_limit = *mem_limit / hashentrysize; + return; + } + + /* + * Calculate expected memory requirements for spilling, which is the size + * of the buffers needed for all the tapes that need to be open at + * once. Then, subtract that from the memory available for holding hash + * tables. + */ + npartitions = hash_choose_num_partitions(input_groups, + hashentrysize, + used_bits, + NULL); + if (num_partitions != NULL) + *num_partitions = npartitions; + + partition_mem = + HASHAGG_READ_BUFFER_SIZE + + HASHAGG_WRITE_BUFFER_SIZE * npartitions; + + /* + * Don't set the limit below 3/4 of work_mem. In that case, we are at the + * minimum number of partitions, so we aren't going to dramatically exceed + * work mem anyway. + */ + if (work_mem * 1024L > 4 * partition_mem) + *mem_limit = work_mem * 1024L - partition_mem; + else + *mem_limit = work_mem * 1024L * 0.75; + + if (*mem_limit > hashentrysize) + *ngroups_limit = *mem_limit / hashentrysize; + else + *ngroups_limit = 1; +} + +/* + * hash_agg_check_limits + * + * After adding a new group to the hash table, check whether we need to enter + * spill mode. Allocations may happen without adding new groups (for instance, + * if the transition state size grows), so this check is imperfect. + */ +static void +hash_agg_check_limits(AggState *aggstate) +{ + uint64 ngroups = aggstate->hash_ngroups_current; + Size meta_mem = MemoryContextMemAllocated( + aggstate->hash_metacxt, true); + Size hash_mem = MemoryContextMemAllocated( + aggstate->hashcontext->ecxt_per_tuple_memory, true); + + /* + * Don't spill unless there's at least one group in the hash table so we + * can be sure to make progress even in edge cases. + */ + if (aggstate->hash_ngroups_current > 0 && + (meta_mem + hash_mem > aggstate->hash_mem_limit || + ngroups > aggstate->hash_ngroups_limit)) + { + hash_agg_enter_spill_mode(aggstate); + } +} + +/* + * Enter "spill mode", meaning that no new groups are added to any of the hash + * tables. Tuples that would create a new group are instead spilled, and + * processed later. + */ +static void +hash_agg_enter_spill_mode(AggState *aggstate) +{ + aggstate->hash_spill_mode = true; + hashagg_recompile_expressions(aggstate, aggstate->table_filled, true); + + if (!aggstate->hash_ever_spilled) + { + Assert(aggstate->hash_tapeinfo == NULL); + Assert(aggstate->hash_spills == NULL); + + aggstate->hash_ever_spilled = true; + + hashagg_tapeinfo_init(aggstate); + + aggstate->hash_spills = palloc( + sizeof(HashAggSpill) * aggstate->num_hashes); + + for (int setno = 0; setno < aggstate->num_hashes; setno++) + { + AggStatePerHash perhash = &aggstate->perhash[setno]; + HashAggSpill *spill = &aggstate->hash_spills[setno]; + + hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + perhash->aggnode->numGroups, + aggstate->hashentrysize); + } + } +} + +/* + * Update metrics after filling the hash table. + * + * If reading from the outer plan, from_tape should be false; if reading from + * another tape, from_tape should be true. + */ +static void +hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) +{ + Size meta_mem; + Size hash_mem; + Size buffer_mem; + Size total_mem; + + if (aggstate->aggstrategy != AGG_MIXED && + aggstate->aggstrategy != AGG_HASHED) + return; + + /* memory for the hash table itself */ + meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true); + + /* memory for the group keys and transition states */ + hash_mem = MemoryContextMemAllocated( + aggstate->hashcontext->ecxt_per_tuple_memory, true); + + /* memory for read/write tape buffers, if spilled */ + buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE; + if (from_tape) + buffer_mem += HASHAGG_READ_BUFFER_SIZE; + + /* update peak mem */ + total_mem = meta_mem + hash_mem + buffer_mem; + if (total_mem > aggstate->hash_mem_peak) + aggstate->hash_mem_peak = total_mem; + + /* update disk usage */ + if (aggstate->hash_tapeinfo != NULL) + { + uint64 disk_used = LogicalTapeSetBlocks( + aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024); + + if (aggstate->hash_disk_used < disk_used) + aggstate->hash_disk_used = disk_used; + } + + /* + * Update hashentrysize estimate based on contents. Don't include meta_mem + * in the memory used, because empty buckets would inflate the per-entry + * cost. An underestimate of the per-entry size is better than an + * overestimate, because an overestimate could compound with each level of + * recursion. + */ + if (aggstate->hash_ngroups_current > 0) + { + aggstate->hashentrysize = + hash_mem / (double)aggstate->hash_ngroups_current; + } +} + +/* + * Choose a reasonable number of buckets for the initial hash table size. + */ +static long +hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) +{ + long max_nbuckets; + long nbuckets = ngroups; + + max_nbuckets = memory / hashentrysize; + + /* + * Leave room for slop to avoid a case where the initial hash table size + * exceeds the memory limit (though that may still happen in edge cases). + */ + max_nbuckets *= 0.75; + + if (nbuckets > max_nbuckets) + nbuckets = max_nbuckets; + if (nbuckets < HASHAGG_MIN_BUCKETS) + nbuckets = HASHAGG_MIN_BUCKETS; + return nbuckets; +} + +/* + * Determine the number of partitions to create when spilling, which will + * always be a power of two. If log2_npartitions is non-NULL, set + * *log2_npartitions to the log2() of the number of partitions. + */ +static int +hash_choose_num_partitions(uint64 input_groups, double hashentrysize, + int used_bits, int *log2_npartitions) +{ + Size mem_wanted; + int partition_limit; + int npartitions; + int partition_bits; + + /* + * Avoid creating so many partitions that the memory requirements of the + * open partition files are greater than 1/4 of work_mem. + */ + partition_limit = + (work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) / + HASHAGG_WRITE_BUFFER_SIZE; + + mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize; + + /* make enough partitions so that each one is likely to fit in memory */ + npartitions = 1 + (mem_wanted / (work_mem * 1024L)); + + if (npartitions > partition_limit) + npartitions = partition_limit; + + if (npartitions < HASHAGG_MIN_PARTITIONS) + npartitions = HASHAGG_MIN_PARTITIONS; + if (npartitions > HASHAGG_MAX_PARTITIONS) + npartitions = HASHAGG_MAX_PARTITIONS; + + /* ceil(log2(npartitions)) */ + partition_bits = my_log2(npartitions); + + /* make sure that we don't exhaust the hash bits */ + if (partition_bits + used_bits >= 32) + partition_bits = 32 - used_bits; + + if (log2_npartitions != NULL) + *log2_npartitions = partition_bits; + + /* number of partitions will be a power of two */ + npartitions = 1L << partition_bits; + + return npartitions; +} + /* * Find or create a hashtable entry for the tuple group containing the current * tuple (already set in tmpcontext's outertuple slot), in the current grouping @@ -1495,6 +1970,10 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace) * * When called, CurrentMemoryContext should be the per-query context. The * already-calculated hash value for the tuple must be specified. + * + * If in "spill mode", then only find existing hashtable entries; don't create + * new ones. If a tuple's group is not already present in the hash table for + * the current grouping set, return NULL and the caller will spill it to disk. */ static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash) @@ -1502,16 +1981,26 @@ lookup_hash_entry(AggState *aggstate, uint32 hash) AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; TupleTableSlot *hashslot = perhash->hashslot; TupleHashEntryData *entry; - bool isnew; + bool isnew = false; + bool *p_isnew; + + /* if hash table already spilled, don't create new entries */ + p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; /* find or create the hashtable entry using the filtered tuple */ - entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, &isnew, + entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew, hash); + if (entry == NULL) + return NULL; + if (isnew) { - AggStatePerGroup pergroup; - int transno; + AggStatePerGroup pergroup; + int transno; + + aggstate->hash_ngroups_current++; + hash_agg_check_limits(aggstate); pergroup = (AggStatePerGroup) MemoryContextAlloc(perhash->hashtable->tablecxt, @@ -1539,23 +2028,48 @@ lookup_hash_entry(AggState *aggstate, uint32 hash) * returning an array of pergroup pointers suitable for advance_aggregates. * * Be aware that lookup_hash_entry can reset the tmpcontext. + * + * Some entries may be left NULL if we are in "spill mode". The same tuple + * will belong to different groups for each grouping set, so may match a group + * already in memory for one set and match a group not in memory for another + * set. When in "spill mode", the tuple will be spilled for each grouping set + * where it doesn't match a group in memory. + * + * NB: It's possible to spill the same tuple for several different grouping + * sets. This may seem wasteful, but it's actually a trade-off: if we spill + * the tuple multiple times for multiple grouping sets, it can be partitioned + * for each grouping set, making the refilling of the hash table very + * efficient. */ static void lookup_hash_entries(AggState *aggstate) { - int numHashes = aggstate->num_hashes; AggStatePerGroup *pergroup = aggstate->hash_pergroup; int setno; - for (setno = 0; setno < numHashes; setno++) + for (setno = 0; setno < aggstate->num_hashes; setno++) { - AggStatePerHash perhash = &aggstate->perhash[setno]; + AggStatePerHash perhash = &aggstate->perhash[setno]; uint32 hash; select_current_set(aggstate, setno, true); prepare_hash_slot(aggstate); hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot); pergroup[setno] = lookup_hash_entry(aggstate, hash); + + /* check to see if we need to spill the tuple for this grouping set */ + if (pergroup[setno] == NULL) + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; + + if (spill->partitions == NULL) + hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + perhash->aggnode->numGroups, + aggstate->hashentrysize); + + hashagg_spill_tuple(spill, slot, hash); + } } } @@ -1878,6 +2392,12 @@ agg_retrieve_direct(AggState *aggstate) if (TupIsNull(outerslot)) { /* no more outer-plan tuples available */ + + /* if we built hash tables, finalize any spills */ + if (aggstate->aggstrategy == AGG_MIXED && + aggstate->current_phase == 1) + hashagg_finish_initial_spills(aggstate); + if (hasGroupingSets) { aggstate->input_done = true; @@ -1980,6 +2500,9 @@ agg_fill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } + /* finalize spills, if any */ + hashagg_finish_initial_spills(aggstate); + aggstate->table_filled = true; /* Initialize to walk the first hash table */ select_current_set(aggstate, 0, true); @@ -1987,11 +2510,189 @@ agg_fill_hash_table(AggState *aggstate) &aggstate->perhash[0].hashiter); } +/* + * If any data was spilled during hash aggregation, reset the hash table and + * reprocess one batch of spilled data. After reprocessing a batch, the hash + * table will again contain data, ready to be consumed by + * agg_retrieve_hash_table_in_memory(). + * + * Should only be called after all in memory hash table entries have been + * finalized and emitted. + * + * Return false when input is exhausted and there's no more work to be done; + * otherwise return true. + */ +static bool +agg_refill_hash_table(AggState *aggstate) +{ + HashAggBatch *batch; + HashAggSpill spill; + HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + uint64 ngroups_estimate; + bool spill_initialized = false; + + if (aggstate->hash_batches == NIL) + return false; + + batch = linitial(aggstate->hash_batches); + aggstate->hash_batches = list_delete_first(aggstate->hash_batches); + + /* + * Estimate the number of groups for this batch as the total number of + * tuples in its input file. Although that's a worst case, it's not bad + * here for two reasons: (1) overestimating is better than + * underestimating; and (2) we've already scanned the relation once, so + * it's likely that we've already finalized many of the common values. + */ + ngroups_estimate = batch->input_tuples; + + hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate, + batch->used_bits, &aggstate->hash_mem_limit, + &aggstate->hash_ngroups_limit, NULL); + + /* there could be residual pergroup pointers; clear them */ + for (int setoff = 0; + setoff < aggstate->maxsets + aggstate->num_hashes; + setoff++) + aggstate->all_pergroups[setoff] = NULL; + + /* free memory and reset hash tables */ + ReScanExprContext(aggstate->hashcontext); + for (int setno = 0; setno < aggstate->num_hashes; setno++) + ResetTupleHashTable(aggstate->perhash[setno].hashtable); + + aggstate->hash_ngroups_current = 0; + + /* + * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output + * happens in phase 0. So, we switch to phase 1 when processing a batch, + * and back to phase 0 after the batch is done. + */ + Assert(aggstate->current_phase == 0); + if (aggstate->phase->aggstrategy == AGG_MIXED) + { + aggstate->current_phase = 1; + aggstate->phase = &aggstate->phases[aggstate->current_phase]; + } + + select_current_set(aggstate, batch->setno, true); + + /* + * Spilled tuples are always read back as MinimalTuples, which may be + * different from the outer plan, so recompile the aggregate expressions. + * + * We still need the NULL check, because we are only processing one + * grouping set at a time and the rest will be NULL. + */ + hashagg_recompile_expressions(aggstate, true, true); + + LogicalTapeRewindForRead(tapeinfo->tapeset, batch->input_tapenum, + HASHAGG_READ_BUFFER_SIZE); + for (;;) { + TupleTableSlot *slot = aggstate->hash_spill_slot; + MinimalTuple tuple; + uint32 hash; + + CHECK_FOR_INTERRUPTS(); + + tuple = hashagg_batch_read(batch, &hash); + if (tuple == NULL) + break; + + ExecStoreMinimalTuple(tuple, slot, true); + aggstate->tmpcontext->ecxt_outertuple = slot; + + prepare_hash_slot(aggstate); + aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(aggstate, hash); + + if (aggstate->hash_pergroup[batch->setno] != NULL) + { + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + } + else + { + if (!spill_initialized) + { + /* + * Avoid initializing the spill until we actually need it so + * that we don't assign tapes that will never be used. + */ + spill_initialized = true; + hashagg_spill_init(&spill, tapeinfo, batch->used_bits, + ngroups_estimate, aggstate->hashentrysize); + } + /* no memory for a new group, spill */ + hashagg_spill_tuple(&spill, slot, hash); + } + + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); + } + + hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); + + /* change back to phase 0 */ + aggstate->current_phase = 0; + aggstate->phase = &aggstate->phases[aggstate->current_phase]; + + if (spill_initialized) + { + hash_agg_update_metrics(aggstate, true, spill.npartitions); + hashagg_spill_finish(aggstate, &spill, batch->setno); + } + else + hash_agg_update_metrics(aggstate, true, 0); + + aggstate->hash_spill_mode = false; + + /* prepare to walk the first hash table */ + select_current_set(aggstate, batch->setno, true); + ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable, + &aggstate->perhash[batch->setno].hashiter); + + pfree(batch); + + return true; +} + /* * ExecAgg for hashed case: retrieving groups from hash table + * + * After exhausting in-memory tuples, also try refilling the hash table using + * previously-spilled tuples. Only returns NULL after all in-memory and + * spilled tuples are exhausted. */ static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) +{ + TupleTableSlot *result = NULL; + + while (result == NULL) + { + result = agg_retrieve_hash_table_in_memory(aggstate); + if (result == NULL) + { + if (!agg_refill_hash_table(aggstate)) + { + aggstate->agg_done = true; + break; + } + } + } + + return result; +} + +/* + * Retrieve the groups from the in-memory hash tables without considering any + * spilled tuples. + */ +static TupleTableSlot * +agg_retrieve_hash_table_in_memory(AggState *aggstate) { ExprContext *econtext; AggStatePerAgg peragg; @@ -2020,7 +2721,7 @@ agg_retrieve_hash_table(AggState *aggstate) * We loop retrieving groups until we find one satisfying * aggstate->ss.ps.qual */ - while (!aggstate->agg_done) + for (;;) { TupleTableSlot *hashslot = perhash->hashslot; int i; @@ -2051,8 +2752,6 @@ agg_retrieve_hash_table(AggState *aggstate) } else { - /* No more hashtables, so done */ - aggstate->agg_done = true; return NULL; } } @@ -2109,6 +2808,313 @@ agg_retrieve_hash_table(AggState *aggstate) return NULL; } +/* + * Initialize HashTapeInfo + */ +static void +hashagg_tapeinfo_init(AggState *aggstate) +{ + HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo)); + int init_tapes = 16; /* expanded dynamically */ + + tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1); + tapeinfo->ntapes = init_tapes; + tapeinfo->nfreetapes = init_tapes; + tapeinfo->freetapes_alloc = init_tapes; + tapeinfo->freetapes = palloc(init_tapes * sizeof(int)); + for (int i = 0; i < init_tapes; i++) + tapeinfo->freetapes[i] = i; + + aggstate->hash_tapeinfo = tapeinfo; +} + +/* + * Assign unused tapes to spill partitions, extending the tape set if + * necessary. + */ +static void +hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions, + int npartitions) +{ + int partidx = 0; + + /* use free tapes if available */ + while (partidx < npartitions && tapeinfo->nfreetapes > 0) + partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes]; + + if (partidx < npartitions) + { + LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx); + + while (partidx < npartitions) + partitions[partidx++] = tapeinfo->ntapes++; + } +} + +/* + * After a tape has already been written to and then read, this function + * rewinds it for writing and adds it to the free list. + */ +static void +hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) +{ + LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum); + if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes) + { + tapeinfo->freetapes_alloc <<= 1; + tapeinfo->freetapes = repalloc( + tapeinfo->freetapes, tapeinfo->freetapes_alloc * sizeof(int)); + } + tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum; +} + +/* + * hashagg_spill_init + * + * Called after we determined that spilling is necessary. Chooses the number + * of partitions to create, and initializes them. + */ +static void +hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, + uint64 input_groups, double hashentrysize) +{ + int npartitions; + int partition_bits; + + npartitions = hash_choose_num_partitions( + input_groups, hashentrysize, used_bits, &partition_bits); + + spill->partitions = palloc0(sizeof(int) * npartitions); + spill->ntuples = palloc0(sizeof(int64) * npartitions); + + hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); + + spill->tapeset = tapeinfo->tapeset; + spill->shift = 32 - used_bits - partition_bits; + spill->mask = (npartitions - 1) << spill->shift; + spill->npartitions = npartitions; +} + +/* + * hashagg_spill_tuple + * + * No room for new groups in the hash table. Save for later in the appropriate + * partition. + */ +static Size +hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash) +{ + LogicalTapeSet *tapeset = spill->tapeset; + int partition; + MinimalTuple tuple; + int tapenum; + int total_written = 0; + bool shouldFree; + + Assert(spill->partitions != NULL); + + /* XXX: may contain unnecessary attributes, should project */ + tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); + + partition = (hash & spill->mask) >> spill->shift; + spill->ntuples[partition]++; + + tapenum = spill->partitions[partition]; + + LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); + total_written += sizeof(uint32); + + LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len); + total_written += tuple->t_len; + + if (shouldFree) + pfree(tuple); + + return total_written; +} + +/* + * hashagg_batch_new + * + * Construct a HashAggBatch item, which represents one iteration of HashAgg to + * be done. + */ +static HashAggBatch * +hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, + int64 input_tuples, int used_bits) +{ + HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); + + batch->setno = setno; + batch->used_bits = used_bits; + batch->tapeset = tapeset; + batch->input_tapenum = tapenum; + batch->input_tuples = input_tuples; + + return batch; +} + +/* + * read_spilled_tuple + * read the next tuple from a batch's tape. Return NULL if no more. + */ +static MinimalTuple +hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) +{ + LogicalTapeSet *tapeset = batch->tapeset; + int tapenum = batch->input_tapenum; + MinimalTuple tuple; + uint32 t_len; + size_t nread; + uint32 hash; + + nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32)); + if (nread == 0) + return NULL; + if (nread != sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, sizeof(uint32), nread))); + if (hashp != NULL) + *hashp = hash; + + nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len)); + if (nread != sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, sizeof(uint32), nread))); + + tuple = (MinimalTuple) palloc(t_len); + tuple->t_len = t_len; + + nread = LogicalTapeRead(tapeset, tapenum, + (void *)((char *)tuple + sizeof(uint32)), + t_len - sizeof(uint32)); + if (nread != t_len - sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, t_len - sizeof(uint32), nread))); + + return tuple; +} + +/* + * hashagg_finish_initial_spills + * + * After a HashAggBatch has been processed, it may have spilled tuples to + * disk. If so, turn the spilled partitions into new batches that must later + * be executed. + */ +static void +hashagg_finish_initial_spills(AggState *aggstate) +{ + int setno; + int total_npartitions = 0; + + if (aggstate->hash_spills != NULL) + { + for (setno = 0; setno < aggstate->num_hashes; setno++) + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + total_npartitions += spill->npartitions; + hashagg_spill_finish(aggstate, spill, setno); + } + + /* + * We're not processing tuples from outer plan any more; only + * processing batches of spilled tuples. The initial spill structures + * are no longer needed. + */ + pfree(aggstate->hash_spills); + aggstate->hash_spills = NULL; + } + + hash_agg_update_metrics(aggstate, false, total_npartitions); + aggstate->hash_spill_mode = false; +} + +/* + * hashagg_spill_finish + * + * Transform spill partitions into new batches. + */ +static void +hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) +{ + int i; + int used_bits = 32 - spill->shift; + + if (spill->npartitions == 0) + return; /* didn't spill */ + + for (i = 0; i < spill->npartitions; i++) + { + int tapenum = spill->partitions[i]; + HashAggBatch *new_batch; + + /* if the partition is empty, don't create a new batch of work */ + if (spill->ntuples[i] == 0) + continue; + + new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset, + tapenum, setno, spill->ntuples[i], + used_bits); + aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); + aggstate->hash_batches_used++; + } + + pfree(spill->ntuples); + pfree(spill->partitions); +} + +/* + * Free resources related to a spilled HashAgg. + */ +static void +hashagg_reset_spill_state(AggState *aggstate) +{ + ListCell *lc; + + /* free spills from initial pass */ + if (aggstate->hash_spills != NULL) + { + int setno; + + for (setno = 0; setno < aggstate->num_hashes; setno++) + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + pfree(spill->ntuples); + pfree(spill->partitions); + } + pfree(aggstate->hash_spills); + aggstate->hash_spills = NULL; + } + + /* free batches */ + foreach(lc, aggstate->hash_batches) + { + HashAggBatch *batch = (HashAggBatch*) lfirst(lc); + pfree(batch); + } + list_free(aggstate->hash_batches); + aggstate->hash_batches = NIL; + + /* close tape set */ + if (aggstate->hash_tapeinfo != NULL) + { + HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + + LogicalTapeSetClose(tapeinfo->tapeset); + pfree(tapeinfo->freetapes); + pfree(tapeinfo); + aggstate->hash_tapeinfo = NULL; + } +} + + /* ----------------- * ExecInitAgg * @@ -2518,9 +3524,36 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) */ if (use_hashing) { + Plan *outerplan = outerPlan(node); + uint64 totalGroups = 0; + int i; + + aggstate->hash_metacxt = AllocSetContextCreate( + aggstate->ss.ps.state->es_query_cxt, + "HashAgg meta context", + ALLOCSET_DEFAULT_SIZES); + aggstate->hash_spill_slot = ExecInitExtraTupleSlot( + estate, scanDesc, &TTSOpsMinimalTuple); + /* this is an array of pointers, not structures */ aggstate->hash_pergroup = pergroups; + aggstate->hashentrysize = hash_agg_entry_size( + aggstate->numtrans, outerplan->plan_width, node->transitionSpace); + + /* + * Consider all of the grouping sets together when setting the limits + * and estimating the number of partitions. This can be inaccurate + * when there is more than one grouping set, but should still be + * reasonable. + */ + for (i = 0; i < aggstate->num_hashes; i++) + totalGroups = aggstate->perhash[i].aggnode->numGroups; + + hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0, + &aggstate->hash_mem_limit, + &aggstate->hash_ngroups_limit, + &aggstate->hash_planned_partitions); find_hash_columns(aggstate); build_hash_tables(aggstate); aggstate->table_filled = false; @@ -2931,6 +3964,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash, false); + /* cache compiled expression for outer slot without NULL check */ + phase->evaltrans_cache[0][0] = phase->evaltrans; } return aggstate; @@ -3424,6 +4459,14 @@ ExecEndAgg(AggState *node) if (node->sort_out) tuplesort_end(node->sort_out); + hashagg_reset_spill_state(node); + + if (node->hash_metacxt != NULL) + { + MemoryContextDelete(node->hash_metacxt); + node->hash_metacxt = NULL; + } + for (transno = 0; transno < node->numtrans; transno++) { AggStatePerTrans pertrans = &node->pertrans[transno]; @@ -3479,12 +4522,13 @@ ExecReScanAgg(AggState *node) return; /* - * If we do have the hash table, and the subplan does not have any - * parameter changes, and none of our own parameter changes affect - * input expressions of the aggregated functions, then we can just - * rescan the existing hash table; no need to build it again. + * If we do have the hash table, and it never spilled, and the subplan + * does not have any parameter changes, and none of our own parameter + * changes affect input expressions of the aggregated functions, then + * we can just rescan the existing hash table; no need to build it + * again. */ - if (outerPlan->chgParam == NULL && + if (outerPlan->chgParam == NULL && !node->hash_ever_spilled && !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) { ResetTupleHashIterator(node->perhash[0].hashtable, @@ -3541,11 +4585,19 @@ ExecReScanAgg(AggState *node) */ if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED) { + hashagg_reset_spill_state(node); + + node->hash_ever_spilled = false; + node->hash_spill_mode = false; + node->hash_ngroups_current = 0; + ReScanExprContext(node->hashcontext); /* Rebuild an empty hash table */ build_hash_tables(node); node->table_filled = false; /* iterator will be reset when the table is filled */ + + hashagg_recompile_expressions(node, false, false); } if (node->aggstrategy != AGG_HASHED) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index b5a0033721..8cf694b61d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -77,6 +77,7 @@ #include "access/htup_details.h" #include "access/tsmapi.h" #include "executor/executor.h" +#include "executor/nodeAgg.h" #include "executor/nodeHash.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -128,6 +129,8 @@ bool enable_bitmapscan = true; bool enable_tidscan = true; bool enable_sort = true; bool enable_hashagg = true; +bool enable_hashagg_disk = true; +bool enable_groupingsets_hash_disk = false; bool enable_nestloop = true; bool enable_material = true; bool enable_mergejoin = true; @@ -2153,7 +2156,7 @@ cost_agg(Path *path, PlannerInfo *root, int numGroupCols, double numGroups, List *quals, Cost input_startup_cost, Cost input_total_cost, - double input_tuples) + double input_tuples, double input_width) { double output_tuples; Cost startup_cost; @@ -2228,14 +2231,79 @@ cost_agg(Path *path, PlannerInfo *root, startup_cost += disable_cost; startup_cost += aggcosts->transCost.startup; startup_cost += aggcosts->transCost.per_tuple * input_tuples; + /* cost of computing hash value */ startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples; startup_cost += aggcosts->finalCost.startup; + total_cost = startup_cost; total_cost += aggcosts->finalCost.per_tuple * numGroups; + /* cost of retrieving from hash table */ total_cost += cpu_tuple_cost * numGroups; output_tuples = numGroups; } + /* + * Add the disk costs of hash aggregation that spills to disk. + * + * Groups that go into the hash table stay in memory until finalized, + * so spilling and reprocessing tuples doesn't incur additional + * invocations of transCost or finalCost. Furthermore, the computed + * hash value is stored with the spilled tuples, so we don't incur + * extra invocations of the hash function. + * + * Hash Agg begins returning tuples after the first batch is + * complete. Accrue writes (spilled tuples) to startup_cost and to + * total_cost; accrue reads only to total_cost. + */ + if (aggstrategy == AGG_HASHED || aggstrategy == AGG_MIXED) + { + double pages_written = 0.0; + double pages_read = 0.0; + double hashentrysize; + double nbatches; + Size mem_limit; + uint64 ngroups_limit; + int num_partitions; + + + /* + * Estimate number of batches based on the computed limits. If less + * than or equal to one, all groups are expected to fit in memory; + * otherwise we expect to spill. + */ + hashentrysize = hash_agg_entry_size( + aggcosts->numAggs, input_width, aggcosts->transitionSpace); + hash_agg_set_limits(hashentrysize, numGroups, 0, &mem_limit, + &ngroups_limit, &num_partitions); + + nbatches = Max( (numGroups * hashentrysize) / mem_limit, + numGroups / ngroups_limit ); + + /* + * Estimate number of pages read and written. For each level of + * recursion, a tuple must be written and then later read. + */ + if (nbatches > 1.0) + { + double depth; + double pages; + + pages = relation_byte_size(input_tuples, input_width) / BLCKSZ; + + /* + * The number of partitions can change at different levels of + * recursion; but for the purposes of this calculation assume it + * stays constant. + */ + depth = ceil( log(nbatches - 1) / log(num_partitions) ); + pages_written = pages_read = pages * depth; + } + + startup_cost += pages_written * random_page_cost; + total_cost += pages_written * random_page_cost; + total_cost += pages_read * seq_page_cost; + } + /* * If there are quals (HAVING quals), account for their cost and * selectivity. diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index b44efd6314..eb25c2f470 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -4258,11 +4258,12 @@ consider_groupingsets_paths(PlannerInfo *root, dNumGroups - exclude_groups); /* - * gd->rollups is empty if we have only unsortable columns to work - * with. Override work_mem in that case; otherwise, we'll rely on the - * sorted-input case to generate usable mixed paths. + * If we have sortable columns to work with (gd->rollups is non-empty) + * and enable_groupingsets_hash_disk is disabled, don't generate + * hash-based paths that will exceed work_mem. */ - if (hashsize > work_mem * 1024L && gd->rollups) + if (!enable_groupingsets_hash_disk && + hashsize > work_mem * 1024L && gd->rollups) return; /* nope, won't fit */ /* @@ -6528,7 +6529,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, * were unable to sort above, then we'd better generate a Path, so * that we at least have one. */ - if (hashaggtablesize < work_mem * 1024L || + if (enable_hashagg_disk || + hashaggtablesize < work_mem * 1024L || grouped_rel->pathlist == NIL) { /* @@ -6561,7 +6563,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, agg_final_costs, dNumGroups); - if (hashaggtablesize < work_mem * 1024L) + if (enable_hashagg_disk || + hashaggtablesize < work_mem * 1024L) add_path(grouped_rel, (Path *) create_agg_path(root, grouped_rel, @@ -6830,7 +6833,7 @@ create_partial_grouping_paths(PlannerInfo *root, * Tentatively produce a partial HashAgg Path, depending on if it * looks as if the hash table will fit in work_mem. */ - if (hashaggtablesize < work_mem * 1024L && + if ((enable_hashagg_disk || hashaggtablesize < work_mem * 1024L) && cheapest_total_path != NULL) { add_path(partially_grouped_rel, (Path *) @@ -6857,7 +6860,7 @@ create_partial_grouping_paths(PlannerInfo *root, dNumPartialPartialGroups); /* Do the same for partial paths. */ - if (hashaggtablesize < work_mem * 1024L && + if ((enable_hashagg_disk || hashaggtablesize < work_mem * 1024L) && cheapest_partial_path != NULL) { add_partial_path(partially_grouped_rel, (Path *) diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 1a23e18970..951aed80e7 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -1072,7 +1072,7 @@ choose_hashed_setop(PlannerInfo *root, List *groupClauses, numGroupCols, dNumGroups, NIL, input_path->startup_cost, input_path->total_cost, - input_path->rows); + input_path->rows, input_path->pathtarget->width); /* * Now for the sorted case. Note that the input is *always* unsorted, diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index d9ce516211..8ba8122ee2 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1704,7 +1704,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, NIL, subpath->startup_cost, subpath->total_cost, - rel->rows); + rel->rows, + subpath->pathtarget->width); } if (sjinfo->semi_can_btree && sjinfo->semi_can_hash) @@ -2958,7 +2959,7 @@ create_agg_path(PlannerInfo *root, list_length(groupClause), numGroups, qual, subpath->startup_cost, subpath->total_cost, - subpath->rows); + subpath->rows, subpath->pathtarget->width); /* add tlist eval cost for each output row */ pathnode->path.startup_cost += target->cost.startup; @@ -3069,7 +3070,8 @@ create_groupingsets_path(PlannerInfo *root, having_qual, subpath->startup_cost, subpath->total_cost, - subpath->rows); + subpath->rows, + subpath->pathtarget->width); is_first = false; if (!rollup->is_hashed) is_first_sort = false; @@ -3092,7 +3094,8 @@ create_groupingsets_path(PlannerInfo *root, rollup->numGroups, having_qual, 0.0, 0.0, - subpath->rows); + subpath->rows, + subpath->pathtarget->width); if (!rollup->is_hashed) is_first_sort = false; } @@ -3117,7 +3120,8 @@ create_groupingsets_path(PlannerInfo *root, having_qual, sort_path.startup_cost, sort_path.total_cost, - sort_path.rows); + sort_path.rows, + subpath->pathtarget->width); } pathnode->path.total_cost += agg_path.total_cost; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 68082315ac..af876d1f01 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -999,6 +999,26 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_hashagg_disk", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of hashed aggregation plans that are expected to exceed work_mem."), + NULL, + GUC_EXPLAIN + }, + &enable_hashagg_disk, + true, + NULL, NULL, NULL + }, + { + {"enable_groupingsets_hash_disk", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of hashed aggregation plans for groupingsets when the total size of the hash tables is expected to exceed work_mem."), + NULL, + GUC_EXPLAIN + }, + &enable_groupingsets_hash_disk, + false, + NULL, NULL, NULL + }, { {"enable_material", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of materialization."), diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 264916f9a9..a5b8a004d1 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -280,6 +280,11 @@ typedef struct AggStatePerPhaseData Sort *sortnode; /* Sort node for input ordering for phase */ ExprState *evaltrans; /* evaluation of transition functions */ + + /* cached variants of the compiled expression */ + ExprState *evaltrans_cache + [2] /* 0: outerops; 1: TTSOpsMinimalTuple */ + [2]; /* 0: no NULL check; 1: with NULL check */ } AggStatePerPhaseData; /* @@ -311,5 +316,8 @@ extern void ExecReScanAgg(AggState *node); extern Size hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace); +extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups, + int used_bits, Size *mem_limit, + uint64 *ngroups_limit, int *num_partitions); #endif /* NODEAGG_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index cd3ddf781f..3d27d50f09 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2079,12 +2079,32 @@ typedef struct AggState /* these fields are used in AGG_HASHED and AGG_MIXED modes: */ bool table_filled; /* hash table filled yet? */ int num_hashes; + MemoryContext hash_metacxt; /* memory for hash table itself */ + struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */ + struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set, + exists only during first pass */ + TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */ + List *hash_batches; /* hash batches remaining to be processed */ + bool hash_ever_spilled; /* ever spilled during this execution? */ + bool hash_spill_mode; /* we hit a limit during the current batch + and we must not create new groups */ + Size hash_mem_limit; /* limit before spilling hash table */ + uint64 hash_ngroups_limit; /* limit before spilling hash table */ + int hash_planned_partitions; /* number of partitions planned + for first pass */ + double hashentrysize; /* estimate revised during execution */ + Size hash_mem_peak; /* peak hash table memory usage */ + uint64 hash_ngroups_current; /* number of groups currently in + memory in all hash tables */ + uint64 hash_disk_used; /* kB of disk space used */ + int hash_batches_used; /* batches used during entire execution */ + AggStatePerHash perhash; /* array of per-hashtable data */ AggStatePerGroup *hash_pergroup; /* grouping set indexed array of * per-group pointers */ /* support for evaluation of agg input expressions: */ -#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34 +#define FIELDNO_AGGSTATE_ALL_PERGROUPS 49 AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index cb012ba198..735ba09650 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -54,6 +54,8 @@ extern PGDLLIMPORT bool enable_bitmapscan; extern PGDLLIMPORT bool enable_tidscan; extern PGDLLIMPORT bool enable_sort; extern PGDLLIMPORT bool enable_hashagg; +extern PGDLLIMPORT bool enable_hashagg_disk; +extern PGDLLIMPORT bool enable_groupingsets_hash_disk; extern PGDLLIMPORT bool enable_nestloop; extern PGDLLIMPORT bool enable_material; extern PGDLLIMPORT bool enable_mergejoin; @@ -114,7 +116,7 @@ extern void cost_agg(Path *path, PlannerInfo *root, int numGroupCols, double numGroups, List *quals, Cost input_startup_cost, Cost input_total_cost, - double input_tuples); + double input_tuples, double input_width); extern void cost_windowagg(Path *path, PlannerInfo *root, List *windowFuncs, int numPartCols, int numOrderCols, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index f457b5b150..0073072a36 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -2357,3 +2357,187 @@ explain (costs off) -> Seq Scan on onek (8 rows) +-- +-- Hash Aggregation Spill tests +-- +set enable_sort=false; +set work_mem='64kB'; +select unique1, count(*), sum(twothousand) from tenk1 +group by unique1 +having sum(fivethous) > 4975 +order by sum(twothousand); + unique1 | count | sum +---------+-------+------ + 4976 | 1 | 976 + 4977 | 1 | 977 + 4978 | 1 | 978 + 4979 | 1 | 979 + 4980 | 1 | 980 + 4981 | 1 | 981 + 4982 | 1 | 982 + 4983 | 1 | 983 + 4984 | 1 | 984 + 4985 | 1 | 985 + 4986 | 1 | 986 + 4987 | 1 | 987 + 4988 | 1 | 988 + 4989 | 1 | 989 + 4990 | 1 | 990 + 4991 | 1 | 991 + 4992 | 1 | 992 + 4993 | 1 | 993 + 4994 | 1 | 994 + 4995 | 1 | 995 + 4996 | 1 | 996 + 4997 | 1 | 997 + 4998 | 1 | 998 + 4999 | 1 | 999 + 9976 | 1 | 1976 + 9977 | 1 | 1977 + 9978 | 1 | 1978 + 9979 | 1 | 1979 + 9980 | 1 | 1980 + 9981 | 1 | 1981 + 9982 | 1 | 1982 + 9983 | 1 | 1983 + 9984 | 1 | 1984 + 9985 | 1 | 1985 + 9986 | 1 | 1986 + 9987 | 1 | 1987 + 9988 | 1 | 1988 + 9989 | 1 | 1989 + 9990 | 1 | 1990 + 9991 | 1 | 1991 + 9992 | 1 | 1992 + 9993 | 1 | 1993 + 9994 | 1 | 1994 + 9995 | 1 | 1995 + 9996 | 1 | 1996 + 9997 | 1 | 1997 + 9998 | 1 | 1998 + 9999 | 1 | 1999 +(48 rows) + +set work_mem to default; +set enable_sort to default; +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low. +-- +set work_mem='64kB'; +-- Produce results with sorting. +set enable_hashagg = false; +set jit_above_cost = 0; +explain (costs off) +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + QUERY PLAN +------------------------------------------------ + GroupAggregate + Group Key: ((g % 100000)) + -> Sort + Sort Key: ((g % 100000)) + -> Function Scan on generate_series g +(5 rows) + +create table agg_group_1 as +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; +create table agg_group_2 as +select * from + (values (100), (300), (500)) as r(a), + lateral ( + select (g/2)::numeric as c1, + array_agg(g::numeric) as c2, + count(*) as c3 + from generate_series(0, 1999) g + where g < r.a + group by g/2) as s; +set jit_above_cost to default; +create table agg_group_3 as +select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; +create table agg_group_4 as +select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; +-- Produce results with hash aggregation +set enable_hashagg = true; +set enable_sort = false; +set jit_above_cost = 0; +explain (costs off) +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + QUERY PLAN +------------------------------------------ + HashAggregate + Group Key: (g % 100000) + -> Function Scan on generate_series g +(3 rows) + +create table agg_hash_1 as +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; +create table agg_hash_2 as +select * from + (values (100), (300), (500)) as r(a), + lateral ( + select (g/2)::numeric as c1, + array_agg(g::numeric) as c2, + count(*) as c3 + from generate_series(0, 1999) g + where g < r.a + group by g/2) as s; +set jit_above_cost to default; +create table agg_hash_3 as +select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; +create table agg_hash_4 as +select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; +set enable_sort = true; +set work_mem to default; +-- Compare group aggregation results to hash aggregation results +(select * from agg_hash_1 except select * from agg_group_1) + union all +(select * from agg_group_1 except select * from agg_hash_1); + c1 | c2 | c3 +----+----+---- +(0 rows) + +(select * from agg_hash_2 except select * from agg_group_2) + union all +(select * from agg_group_2 except select * from agg_hash_2); + a | c1 | c2 | c3 +---+----+----+---- +(0 rows) + +(select * from agg_hash_3 except select * from agg_group_3) + union all +(select * from agg_group_3 except select * from agg_hash_3); + c1 | c2 | c3 +----+----+---- +(0 rows) + +(select * from agg_hash_4 except select * from agg_group_4) + union all +(select * from agg_group_4 except select * from agg_hash_4); + c1 | c2 | c3 +----+----+---- +(0 rows) + +drop table agg_group_1; +drop table agg_group_2; +drop table agg_group_3; +drop table agg_group_4; +drop table agg_hash_1; +drop table agg_hash_2; +drop table agg_hash_3; +drop table agg_hash_4; diff --git a/src/test/regress/expected/groupingsets.out b/src/test/regress/expected/groupingsets.out index c1f802c88a..dbe5140b55 100644 --- a/src/test/regress/expected/groupingsets.out +++ b/src/test/regress/expected/groupingsets.out @@ -1633,4 +1633,126 @@ select v||'a', case when grouping(v||'a') = 1 then 1 else 0 end, count(*) | 1 | 2 (4 rows) +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low +-- and turning on enable_groupingsets_hash_disk. +-- +SET enable_groupingsets_hash_disk = true; +SET work_mem='64kB'; +-- Produce results with sorting. +set enable_hashagg = false; +set jit_above_cost = 0; +explain (costs off) +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + QUERY PLAN +--------------------------------------------------------------- + GroupAggregate + Group Key: ((g.g % 1000)), ((g.g % 100)), ((g.g % 10)) + Group Key: ((g.g % 1000)), ((g.g % 100)) + Group Key: ((g.g % 1000)) + Group Key: () + Sort Key: ((g.g % 100)), ((g.g % 10)) + Group Key: ((g.g % 100)), ((g.g % 10)) + Group Key: ((g.g % 100)) + Sort Key: ((g.g % 10)), ((g.g % 1000)) + Group Key: ((g.g % 10)), ((g.g % 1000)) + Group Key: ((g.g % 10)) + -> Sort + Sort Key: ((g.g % 1000)), ((g.g % 100)), ((g.g % 10)) + -> Function Scan on generate_series g +(14 rows) + +create table gs_group_1 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); +set jit_above_cost to default; +create table gs_group_2 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g/20 as g1000, g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by cube (g1000,g100,g10); +create table gs_group_3 as +select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from + (select g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by grouping sets (g100,g10); +-- Produce results with hash aggregation. +set enable_hashagg = true; +set enable_sort = false; +set work_mem='64kB'; +set jit_above_cost = 0; +explain (costs off) +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + QUERY PLAN +--------------------------------------------------- + MixedAggregate + Hash Key: (g.g % 1000), (g.g % 100), (g.g % 10) + Hash Key: (g.g % 1000), (g.g % 100) + Hash Key: (g.g % 1000) + Hash Key: (g.g % 100), (g.g % 10) + Hash Key: (g.g % 100) + Hash Key: (g.g % 10), (g.g % 1000) + Hash Key: (g.g % 10) + Group Key: () + -> Function Scan on generate_series g +(10 rows) + +create table gs_hash_1 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); +set jit_above_cost to default; +create table gs_hash_2 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g/20 as g1000, g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by cube (g1000,g100,g10); +create table gs_hash_3 as +select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from + (select g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by grouping sets (g100,g10); +set enable_sort = true; +set work_mem to default; +-- Compare results +(select * from gs_hash_1 except select * from gs_group_1) + union all +(select * from gs_group_1 except select * from gs_hash_1); + g1000 | g100 | g10 | sum | count | max +-------+------+-----+-----+-------+----- +(0 rows) + +(select * from gs_hash_2 except select * from gs_group_2) + union all +(select * from gs_group_2 except select * from gs_hash_2); + g1000 | g100 | g10 | sum | count | max +-------+------+-----+-----+-------+----- +(0 rows) + +(select g100,g10,unnest(a),c,m from gs_hash_3 except + select g100,g10,unnest(a),c,m from gs_group_3) + union all +(select g100,g10,unnest(a),c,m from gs_group_3 except + select g100,g10,unnest(a),c,m from gs_hash_3); + g100 | g10 | unnest | c | m +------+-----+--------+---+--- +(0 rows) + +drop table gs_group_1; +drop table gs_group_2; +drop table gs_group_3; +drop table gs_hash_1; +drop table gs_hash_2; +drop table gs_hash_3; +SET enable_groupingsets_hash_disk TO DEFAULT; -- end diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out index f3696c6d1d..11c6f50fbf 100644 --- a/src/test/regress/expected/select_distinct.out +++ b/src/test/regress/expected/select_distinct.out @@ -148,6 +148,68 @@ SELECT count(*) FROM 4 (1 row) +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low. +-- +SET work_mem='64kB'; +-- Produce results with sorting. +SET enable_hashagg=FALSE; +SET jit_above_cost=0; +EXPLAIN (costs off) +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + QUERY PLAN +------------------------------------------------ + Unique + -> Sort + Sort Key: ((g % 1000)) + -> Function Scan on generate_series g +(4 rows) + +CREATE TABLE distinct_group_1 AS +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; +SET jit_above_cost TO DEFAULT; +CREATE TABLE distinct_group_2 AS +SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g; +SET enable_hashagg=TRUE; +-- Produce results with hash aggregation. +SET enable_sort=FALSE; +SET jit_above_cost=0; +EXPLAIN (costs off) +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + QUERY PLAN +------------------------------------------ + HashAggregate + Group Key: (g % 1000) + -> Function Scan on generate_series g +(3 rows) + +CREATE TABLE distinct_hash_1 AS +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; +SET jit_above_cost TO DEFAULT; +CREATE TABLE distinct_hash_2 AS +SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g; +SET enable_sort=TRUE; +SET work_mem TO DEFAULT; +-- Compare results +(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1) + UNION ALL +(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1); + ?column? +---------- +(0 rows) + +(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1) + UNION ALL +(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1); + ?column? +---------- +(0 rows) + +DROP TABLE distinct_hash_1; +DROP TABLE distinct_hash_2; +DROP TABLE distinct_group_1; +DROP TABLE distinct_group_2; -- -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its -- very own regression file. diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index a1c90eb905..715842b87a 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -74,7 +74,9 @@ select name, setting from pg_settings where name like 'enable%'; --------------------------------+--------- enable_bitmapscan | on enable_gathermerge | on + enable_groupingsets_hash_disk | off enable_hashagg | on + enable_hashagg_disk | on enable_hashjoin | on enable_indexonlyscan | on enable_indexscan | on @@ -89,7 +91,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(17 rows) +(19 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index 3e593f2d61..02578330a6 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -1032,3 +1032,134 @@ select v||'a', case when v||'a' = 'aa' then 1 else 0 end, count(*) explain (costs off) select 1 from tenk1 where (hundred, thousand) in (select twothousand, twothousand from onek); + +-- +-- Hash Aggregation Spill tests +-- + +set enable_sort=false; +set work_mem='64kB'; + +select unique1, count(*), sum(twothousand) from tenk1 +group by unique1 +having sum(fivethous) > 4975 +order by sum(twothousand); + +set work_mem to default; +set enable_sort to default; + +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low. +-- + +set work_mem='64kB'; + +-- Produce results with sorting. + +set enable_hashagg = false; + +set jit_above_cost = 0; + +explain (costs off) +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + +create table agg_group_1 as +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + +create table agg_group_2 as +select * from + (values (100), (300), (500)) as r(a), + lateral ( + select (g/2)::numeric as c1, + array_agg(g::numeric) as c2, + count(*) as c3 + from generate_series(0, 1999) g + where g < r.a + group by g/2) as s; + +set jit_above_cost to default; + +create table agg_group_3 as +select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; + +create table agg_group_4 as +select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; + +-- Produce results with hash aggregation + +set enable_hashagg = true; +set enable_sort = false; + +set jit_above_cost = 0; + +explain (costs off) +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + +create table agg_hash_1 as +select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3 + from generate_series(0, 199999) g + group by g%100000; + +create table agg_hash_2 as +select * from + (values (100), (300), (500)) as r(a), + lateral ( + select (g/2)::numeric as c1, + array_agg(g::numeric) as c2, + count(*) as c3 + from generate_series(0, 1999) g + where g < r.a + group by g/2) as s; + +set jit_above_cost to default; + +create table agg_hash_3 as +select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; + +create table agg_hash_4 as +select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3 + from generate_series(0, 1999) g + group by g/2; + +set enable_sort = true; +set work_mem to default; + +-- Compare group aggregation results to hash aggregation results + +(select * from agg_hash_1 except select * from agg_group_1) + union all +(select * from agg_group_1 except select * from agg_hash_1); + +(select * from agg_hash_2 except select * from agg_group_2) + union all +(select * from agg_group_2 except select * from agg_hash_2); + +(select * from agg_hash_3 except select * from agg_group_3) + union all +(select * from agg_group_3 except select * from agg_hash_3); + +(select * from agg_hash_4 except select * from agg_group_4) + union all +(select * from agg_group_4 except select * from agg_hash_4); + +drop table agg_group_1; +drop table agg_group_2; +drop table agg_group_3; +drop table agg_group_4; +drop table agg_hash_1; +drop table agg_hash_2; +drop table agg_hash_3; +drop table agg_hash_4; diff --git a/src/test/regress/sql/groupingsets.sql b/src/test/regress/sql/groupingsets.sql index 95ac3fb52f..478f49ecab 100644 --- a/src/test/regress/sql/groupingsets.sql +++ b/src/test/regress/sql/groupingsets.sql @@ -441,4 +441,107 @@ select v||'a', case when grouping(v||'a') = 1 then 1 else 0 end, count(*) from unnest(array[1,1], array['a','b']) u(i,v) group by rollup(i, v||'a') order by 1,3; +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low +-- and turning on enable_groupingsets_hash_disk. +-- + +SET enable_groupingsets_hash_disk = true; +SET work_mem='64kB'; + +-- Produce results with sorting. + +set enable_hashagg = false; + +set jit_above_cost = 0; + +explain (costs off) +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + +create table gs_group_1 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + +set jit_above_cost to default; + +create table gs_group_2 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g/20 as g1000, g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by cube (g1000,g100,g10); + +create table gs_group_3 as +select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from + (select g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by grouping sets (g100,g10); + +-- Produce results with hash aggregation. + +set enable_hashagg = true; +set enable_sort = false; +set work_mem='64kB'; + +set jit_above_cost = 0; + +explain (costs off) +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + +create table gs_hash_1 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g%1000 as g1000, g%100 as g100, g%10 as g10, g + from generate_series(0,199999) g) s +group by cube (g1000,g100,g10); + +set jit_above_cost to default; + +create table gs_hash_2 as +select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from + (select g/20 as g1000, g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by cube (g1000,g100,g10); + +create table gs_hash_3 as +select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from + (select g/200 as g100, g/2000 as g10, g + from generate_series(0,19999) g) s +group by grouping sets (g100,g10); + +set enable_sort = true; +set work_mem to default; + +-- Compare results + +(select * from gs_hash_1 except select * from gs_group_1) + union all +(select * from gs_group_1 except select * from gs_hash_1); + +(select * from gs_hash_2 except select * from gs_group_2) + union all +(select * from gs_group_2 except select * from gs_hash_2); + +(select g100,g10,unnest(a),c,m from gs_hash_3 except + select g100,g10,unnest(a),c,m from gs_group_3) + union all +(select g100,g10,unnest(a),c,m from gs_group_3 except + select g100,g10,unnest(a),c,m from gs_hash_3); + +drop table gs_group_1; +drop table gs_group_2; +drop table gs_group_3; +drop table gs_hash_1; +drop table gs_hash_2; +drop table gs_hash_3; + +SET enable_groupingsets_hash_disk TO DEFAULT; + -- end diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql index a605e86449..33102744eb 100644 --- a/src/test/regress/sql/select_distinct.sql +++ b/src/test/regress/sql/select_distinct.sql @@ -45,6 +45,68 @@ SELECT count(*) FROM SELECT count(*) FROM (SELECT DISTINCT two, four, two FROM tenk1) ss; +-- +-- Compare results between plans using sorting and plans using hash +-- aggregation. Force spilling in both cases by setting work_mem low. +-- + +SET work_mem='64kB'; + +-- Produce results with sorting. + +SET enable_hashagg=FALSE; + +SET jit_above_cost=0; + +EXPLAIN (costs off) +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + +CREATE TABLE distinct_group_1 AS +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + +SET jit_above_cost TO DEFAULT; + +CREATE TABLE distinct_group_2 AS +SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g; + +SET enable_hashagg=TRUE; + +-- Produce results with hash aggregation. + +SET enable_sort=FALSE; + +SET jit_above_cost=0; + +EXPLAIN (costs off) +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + +CREATE TABLE distinct_hash_1 AS +SELECT DISTINCT g%1000 FROM generate_series(0,9999) g; + +SET jit_above_cost TO DEFAULT; + +CREATE TABLE distinct_hash_2 AS +SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g; + +SET enable_sort=TRUE; + +SET work_mem TO DEFAULT; + +-- Compare results + +(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1) + UNION ALL +(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1); + +(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1) + UNION ALL +(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1); + +DROP TABLE distinct_hash_1; +DROP TABLE distinct_hash_2; +DROP TABLE distinct_group_1; +DROP TABLE distinct_group_2; + -- -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its -- very own regression file.