Increase number of hash join buckets for underestimate.

If we expect batching at the very beginning, we size nbuckets for
"full work_mem" (see how many tuples we can get into work_mem,
while not breaking NTUP_PER_BUCKET threshold).

If we expect to be fine without batching, we start with the 'right'
nbuckets and track the optimal nbuckets as we go (without actually
resizing the hash table). Once we hit work_mem (considering the
optimal nbuckets value), we keep the value.

At the end of the first batch, we check whether (nbuckets !=
nbuckets_optimal) and resize the hash table if needed. Also, we
keep this value for all batches (it's OK because it assumes full
work_mem, and it makes the batchno evaluation trivial). So the
resize happens only once.

There could be cases where it would improve performance to allow
the NTUP_PER_BUCKET threshold to be exceeded to keep everything in
one batch rather than spilling to a second batch, but attempts to
generate such a case have so far been unsuccessful; that issue may
be addressed with a follow-on patch after further investigation.

Tomas Vondra with minor format and comment cleanup by me
Reviewed by Robert Haas, Heikki Linnakangas, and Kevin Grittner
This commit is contained in:
Kevin Grittner 2014-10-13 10:16:36 -05:00
parent 494affbd90
commit 30d7ae3c76
3 changed files with 141 additions and 6 deletions

View File

@ -1901,18 +1901,21 @@ show_hash_info(HashState *hashstate, ExplainState *es)
if (es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
ExplainPropertyLong("Original Hash Buckets",
hashtable->nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
ExplainPropertyLong("Original Hash Batches",
hashtable->nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
else if (hashtable->nbatch_original != hashtable->nbatch)
else if ((hashtable->nbatch_original != hashtable->nbatch) ||
(hashtable->nbuckets_original != hashtable->nbuckets))
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n",
hashtable->nbuckets, hashtable->nbatch,
hashtable->nbatch_original, spacePeakKb);
"Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
hashtable->nbuckets, hashtable->nbuckets_original,
hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
}
else
{

View File

@ -39,6 +39,7 @@
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@ -117,6 +118,7 @@ MultiExecHash(HashState *node)
/* It's a skew tuple, so put it into that hash table */
ExecHashSkewTableInsert(hashtable, slot, hashvalue,
bucketNumber);
hashtable->skewTuples += 1;
}
else
{
@ -127,6 +129,25 @@ MultiExecHash(HashState *node)
}
}
/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
{
/* We never decrease the number of buckets. */
Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
#ifdef HJDEBUG
printf("Increasing nbuckets %d => %d\n",
hashtable->nbuckets, hashtable->nbuckets_optimal);
#endif
ExecHashIncreaseNumBuckets(hashtable);
}
/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@ -272,7 +293,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
*/
hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
hashtable->nbuckets = nbuckets;
hashtable->nbuckets_original = nbuckets;
hashtable->nbuckets_optimal = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->log2_nbuckets_optimal = log2_nbuckets;
hashtable->buckets = NULL;
hashtable->keepNulls = keepNulls;
hashtable->skewEnabled = false;
@ -286,6 +310,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
@ -620,6 +645,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
*/
ninmemory = nfreed = 0;
/* If know we need to resize nbuckets, we can do it while rebatching. */
if (hashtable->nbuckets_optimal != hashtable->nbuckets)
{
/* we never decrease the number of buckets */
Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
hashtable->buckets = repalloc(hashtable->buckets,
sizeof(HashJoinTuple) * hashtable->nbuckets);
}
/*
* We will scan through the chunks directly, so that we can reset the
* buckets now and not have to keep track which tuples in the buckets have
@ -703,6 +741,78 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
}
/*
* ExecHashIncreaseNumBuckets
* increase the original number of buckets in order to reduce
* number of tuples per bucket
*/
static void
ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
{
HashMemoryChunk chunk;
/* do nothing if not an increase (it's called increase for a reason) */
if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
return;
/*
* We already know the optimal number of buckets, so let's just
* compute the log2_nbuckets for it.
*/
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
Assert(hashtable->nbuckets > 1);
Assert(hashtable->nbuckets <= (INT_MAX / 2));
Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
#ifdef HJDEBUG
printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
#endif
/*
* Just reallocate the proper number of buckets - we don't need to
* walk through them - we can walk the dense-allocated chunks
* (just like in ExecHashIncreaseNumBatches, but without all the
* copying into new chunks)
*/
hashtable->buckets =
(HashJoinTuple *) repalloc(hashtable->buckets,
hashtable->nbuckets * sizeof(HashJoinTuple));
memset(hashtable->buckets, 0, sizeof(void *) * hashtable->nbuckets);
/* scan through all tuples in all chunks to rebuild the hash table */
for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
{
/* process all tuples stored in this chunk */
size_t idx = 0;
while (idx < chunk->used)
{
HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
int bucketno;
int batchno;
ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
&bucketno, &batchno);
/* add the tuple to the proper bucket */
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
/* advance index past the tuple */
idx += MAXALIGN(HJTUPLE_OVERHEAD +
HJTUPLE_MINTUPLE(hashTuple)->t_len);
}
}
#ifdef HJDEBUG
printf("Nbuckets increased to %d, average items per bucket %.1f\n",
hashtable->nbuckets, batchTuples / hashtable->nbuckets);
#endif
}
/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
@ -736,6 +846,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
*/
HashJoinTuple hashTuple;
int hashTupleSize;
double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
/* Create the HashJoinTuple */
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
@ -756,11 +867,24 @@ ExecHashTableInsert(HashJoinTable hashtable,
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
/*
* Increase the (optimal) number of buckets if we just exceeded the
* NTUP_PER_BUCKET threshold, but only when there's still a single batch.
*/
if ((hashtable->nbatch == 1) &&
(hashtable->nbuckets_optimal <= INT_MAX/2) && /* overflow protection */
(ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)))
{
hashtable->nbuckets_optimal *= 2;
hashtable->log2_nbuckets_optimal += 1;
}
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
if (hashtable->spaceUsed + hashtable->nbuckets * sizeof(HashJoinTuple)
if (hashtable->spaceUsed +
hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
@ -885,7 +1009,10 @@ ExecHashGetHashValue(HashJoinTable hashtable,
* functions are good about randomizing all their output bits, else we are
* likely to have very skewed bucket or batch occupancy.)
*
* nbuckets doesn't change over the course of the join.
* nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic
* bucket count growth. Once we start batching, the value is fixed and does
* not change over the course of the join (making it possible to compute batch
* number the way we do here).
*
* nbatch is always a power of 2; we increase it only by doubling it. This
* effectively adds one more bit to the top of the batchno.

View File

@ -127,6 +127,10 @@ typedef struct HashJoinTableData
int nbuckets; /* # buckets in the in-memory hash table */
int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
int nbuckets_original; /* # buckets when starting the first hash */
int nbuckets_optimal; /* optimal # buckets (per batch) */
int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
struct HashJoinTupleData **buckets;
/* buckets array is per-batch storage, as are all the tuples */
@ -148,6 +152,7 @@ typedef struct HashJoinTableData
bool growEnabled; /* flag to shut off nbatch increases */
double totalTuples; /* # tuples obtained from inner plan */
double skewTuples; /* # tuples inserted into skew tuples */
/*
* These arrays are allocated for the life of the hash join, but only if