diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 49963ff429..387d263e87 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1901,18 +1901,21 @@ show_hash_info(HashState *hashstate, ExplainState *es) if (es->format != EXPLAIN_FORMAT_TEXT) { ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Original Hash Buckets", + hashtable->nbuckets_original, es); ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); ExplainPropertyLong("Original Hash Batches", hashtable->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch) + else if ((hashtable->nbatch_original != hashtable->nbatch) || + (hashtable->nbuckets_original != hashtable->nbuckets)) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, - "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, - hashtable->nbatch_original, spacePeakKb); + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hashtable->nbuckets, hashtable->nbuckets_original, + hashtable->nbatch, hashtable->nbatch_original, spacePeakKb); } else { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index b428c18b5c..7c5bb77b0c 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -39,6 +39,7 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -117,6 +118,7 @@ MultiExecHash(HashState *node) /* It's a skew tuple, so put it into that hash table */ ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber); + hashtable->skewTuples += 1; } else { @@ -127,6 +129,25 @@ MultiExecHash(HashState *node) } } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ + if (hashtable->nbuckets != hashtable->nbuckets_optimal) + { + /* We never decrease the number of buckets. */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + +#ifdef HJDEBUG + printf("Increasing nbuckets %d => %d\n", + hashtable->nbuckets, hashtable->nbuckets_optimal); +#endif + + ExecHashIncreaseNumBuckets(hashtable); + } + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + /* must provide our own instrumentation support */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -272,7 +293,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); hashtable->nbuckets = nbuckets; + hashtable->nbuckets_original = nbuckets; + hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; + hashtable->log2_nbuckets_optimal = log2_nbuckets; hashtable->buckets = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; @@ -286,6 +310,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; @@ -620,6 +645,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) */ ninmemory = nfreed = 0; + /* If know we need to resize nbuckets, we can do it while rebatching. */ + if (hashtable->nbuckets_optimal != hashtable->nbuckets) + { + /* we never decrease the number of buckets */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; + + hashtable->buckets = repalloc(hashtable->buckets, + sizeof(HashJoinTuple) * hashtable->nbuckets); + } + /* * We will scan through the chunks directly, so that we can reset the * buckets now and not have to keep track which tuples in the buckets have @@ -703,6 +741,78 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } } +/* + * ExecHashIncreaseNumBuckets + * increase the original number of buckets in order to reduce + * number of tuples per bucket + */ +static void +ExecHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + HashMemoryChunk chunk; + + /* do nothing if not an increase (it's called increase for a reason) */ + if (hashtable->nbuckets >= hashtable->nbuckets_optimal) + return; + + /* + * We already know the optimal number of buckets, so let's just + * compute the log2_nbuckets for it. + */ + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal); + + Assert(hashtable->nbuckets > 1); + Assert(hashtable->nbuckets <= (INT_MAX / 2)); + Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); + +#ifdef HJDEBUG + printf("Increasing nbuckets to %d\n", hashtable->nbuckets); +#endif + + /* + * Just reallocate the proper number of buckets - we don't need to + * walk through them - we can walk the dense-allocated chunks + * (just like in ExecHashIncreaseNumBatches, but without all the + * copying into new chunks) + */ + hashtable->buckets = + (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinTuple)); + + memset(hashtable->buckets, 0, sizeof(void *) * hashtable->nbuckets); + + /* scan through all tuples in all chunks to rebuild the hash table */ + for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + { + /* process all tuples stored in this chunk */ + size_t idx = 0; + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* add the tuple to the proper bucket */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + } + +#ifdef HJDEBUG + printf("Nbuckets increased to %d, average items per bucket %.1f\n", + hashtable->nbuckets, batchTuples / hashtable->nbuckets); +#endif +} + + /* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value @@ -736,6 +846,7 @@ ExecHashTableInsert(HashJoinTable hashtable, */ HashJoinTuple hashTuple; int hashTupleSize; + double ntuples = (hashtable->totalTuples - hashtable->skewTuples); /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; @@ -756,11 +867,24 @@ ExecHashTableInsert(HashJoinTable hashtable, hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; + /* + * Increase the (optimal) number of buckets if we just exceeded the + * NTUP_PER_BUCKET threshold, but only when there's still a single batch. + */ + if ((hashtable->nbatch == 1) && + (hashtable->nbuckets_optimal <= INT_MAX/2) && /* overflow protection */ + (ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } + /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed + hashtable->nbuckets * sizeof(HashJoinTuple) + if (hashtable->spaceUsed + + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } @@ -885,7 +1009,10 @@ ExecHashGetHashValue(HashJoinTable hashtable, * functions are good about randomizing all their output bits, else we are * likely to have very skewed bucket or batch occupancy.) * - * nbuckets doesn't change over the course of the join. + * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic + * bucket count growth. Once we start batching, the value is fixed and does + * not change over the course of the join (making it possible to compute batch + * number the way we do here). * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index c9e61dfa39..0e1e0cd5f0 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -127,6 +127,10 @@ typedef struct HashJoinTableData int nbuckets; /* # buckets in the in-memory hash table */ int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ + int nbuckets_original; /* # buckets when starting the first hash */ + int nbuckets_optimal; /* optimal # buckets (per batch) */ + int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */ + /* buckets[i] is head of list of tuples in i'th in-memory bucket */ struct HashJoinTupleData **buckets; /* buckets array is per-batch storage, as are all the tuples */ @@ -148,6 +152,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double skewTuples; /* # tuples inserted into skew tuples */ /* * These arrays are allocated for the life of the hash join, but only if