From 30d7ae3c76d2de144232ae6ab328ca86b70e72c3 Mon Sep 17 00:00:00 2001 From: Kevin Grittner Date: Mon, 13 Oct 2014 10:16:36 -0500 Subject: [PATCH] Increase number of hash join buckets for underestimate. If we expect batching at the very beginning, we size nbuckets for "full work_mem" (see how many tuples we can get into work_mem, while not breaking NTUP_PER_BUCKET threshold). If we expect to be fine without batching, we start with the 'right' nbuckets and track the optimal nbuckets as we go (without actually resizing the hash table). Once we hit work_mem (considering the optimal nbuckets value), we keep the value. At the end of the first batch, we check whether (nbuckets != nbuckets_optimal) and resize the hash table if needed. Also, we keep this value for all batches (it's OK because it assumes full work_mem, and it makes the batchno evaluation trivial). So the resize happens only once. There could be cases where it would improve performance to allow the NTUP_PER_BUCKET threshold to be exceeded to keep everything in one batch rather than spilling to a second batch, but attempts to generate such a case have so far been unsuccessful; that issue may be addressed with a follow-on patch after further investigation. Tomas Vondra with minor format and comment cleanup by me Reviewed by Robert Haas, Heikki Linnakangas, and Kevin Grittner --- src/backend/commands/explain.c | 11 ++- src/backend/executor/nodeHash.c | 131 +++++++++++++++++++++++++++++++- src/include/executor/hashjoin.h | 5 ++ 3 files changed, 141 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 49963ff429..387d263e87 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1901,18 +1901,21 @@ show_hash_info(HashState *hashstate, ExplainState *es) if (es->format != EXPLAIN_FORMAT_TEXT) { ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Original Hash Buckets", + hashtable->nbuckets_original, es); ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); ExplainPropertyLong("Original Hash Batches", hashtable->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch) + else if ((hashtable->nbatch_original != hashtable->nbatch) || + (hashtable->nbuckets_original != hashtable->nbuckets)) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, - "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, - hashtable->nbatch_original, spacePeakKb); + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hashtable->nbuckets, hashtable->nbuckets_original, + hashtable->nbatch, hashtable->nbatch_original, spacePeakKb); } else { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index b428c18b5c..7c5bb77b0c 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -39,6 +39,7 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -117,6 +118,7 @@ MultiExecHash(HashState *node) /* It's a skew tuple, so put it into that hash table */ ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber); + hashtable->skewTuples += 1; } else { @@ -127,6 +129,25 @@ MultiExecHash(HashState *node) } } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ + if (hashtable->nbuckets != hashtable->nbuckets_optimal) + { + /* We never decrease the number of buckets. */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + +#ifdef HJDEBUG + printf("Increasing nbuckets %d => %d\n", + hashtable->nbuckets, hashtable->nbuckets_optimal); +#endif + + ExecHashIncreaseNumBuckets(hashtable); + } + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + /* must provide our own instrumentation support */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -272,7 +293,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); hashtable->nbuckets = nbuckets; + hashtable->nbuckets_original = nbuckets; + hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; + hashtable->log2_nbuckets_optimal = log2_nbuckets; hashtable->buckets = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; @@ -286,6 +310,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; @@ -620,6 +645,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) */ ninmemory = nfreed = 0; + /* If know we need to resize nbuckets, we can do it while rebatching. */ + if (hashtable->nbuckets_optimal != hashtable->nbuckets) + { + /* we never decrease the number of buckets */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; + + hashtable->buckets = repalloc(hashtable->buckets, + sizeof(HashJoinTuple) * hashtable->nbuckets); + } + /* * We will scan through the chunks directly, so that we can reset the * buckets now and not have to keep track which tuples in the buckets have @@ -703,6 +741,78 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } } +/* + * ExecHashIncreaseNumBuckets + * increase the original number of buckets in order to reduce + * number of tuples per bucket + */ +static void +ExecHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + HashMemoryChunk chunk; + + /* do nothing if not an increase (it's called increase for a reason) */ + if (hashtable->nbuckets >= hashtable->nbuckets_optimal) + return; + + /* + * We already know the optimal number of buckets, so let's just + * compute the log2_nbuckets for it. + */ + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal); + + Assert(hashtable->nbuckets > 1); + Assert(hashtable->nbuckets <= (INT_MAX / 2)); + Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); + +#ifdef HJDEBUG + printf("Increasing nbuckets to %d\n", hashtable->nbuckets); +#endif + + /* + * Just reallocate the proper number of buckets - we don't need to + * walk through them - we can walk the dense-allocated chunks + * (just like in ExecHashIncreaseNumBatches, but without all the + * copying into new chunks) + */ + hashtable->buckets = + (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinTuple)); + + memset(hashtable->buckets, 0, sizeof(void *) * hashtable->nbuckets); + + /* scan through all tuples in all chunks to rebuild the hash table */ + for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) + { + /* process all tuples stored in this chunk */ + size_t idx = 0; + while (idx < chunk->used) + { + HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx); + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* add the tuple to the proper bucket */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + + /* advance index past the tuple */ + idx += MAXALIGN(HJTUPLE_OVERHEAD + + HJTUPLE_MINTUPLE(hashTuple)->t_len); + } + } + +#ifdef HJDEBUG + printf("Nbuckets increased to %d, average items per bucket %.1f\n", + hashtable->nbuckets, batchTuples / hashtable->nbuckets); +#endif +} + + /* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value @@ -736,6 +846,7 @@ ExecHashTableInsert(HashJoinTable hashtable, */ HashJoinTuple hashTuple; int hashTupleSize; + double ntuples = (hashtable->totalTuples - hashtable->skewTuples); /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; @@ -756,11 +867,24 @@ ExecHashTableInsert(HashJoinTable hashtable, hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; + /* + * Increase the (optimal) number of buckets if we just exceeded the + * NTUP_PER_BUCKET threshold, but only when there's still a single batch. + */ + if ((hashtable->nbatch == 1) && + (hashtable->nbuckets_optimal <= INT_MAX/2) && /* overflow protection */ + (ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } + /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed + hashtable->nbuckets * sizeof(HashJoinTuple) + if (hashtable->spaceUsed + + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } @@ -885,7 +1009,10 @@ ExecHashGetHashValue(HashJoinTable hashtable, * functions are good about randomizing all their output bits, else we are * likely to have very skewed bucket or batch occupancy.) * - * nbuckets doesn't change over the course of the join. + * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic + * bucket count growth. Once we start batching, the value is fixed and does + * not change over the course of the join (making it possible to compute batch + * number the way we do here). * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index c9e61dfa39..0e1e0cd5f0 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -127,6 +127,10 @@ typedef struct HashJoinTableData int nbuckets; /* # buckets in the in-memory hash table */ int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ + int nbuckets_original; /* # buckets when starting the first hash */ + int nbuckets_optimal; /* optimal # buckets (per batch) */ + int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */ + /* buckets[i] is head of list of tuples in i'th in-memory bucket */ struct HashJoinTupleData **buckets; /* buckets array is per-batch storage, as are all the tuples */ @@ -148,6 +152,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double skewTuples; /* # tuples inserted into skew tuples */ /* * These arrays are allocated for the life of the hash join, but only if