Make EXPLAIN report maximum hashtable usage across multiple rescans.

Before discarding the old hash table in ExecReScanHashJoin, capture
its statistics, ensuring that we report the maximum hashtable size
across repeated rescans of the hash input relation.  We can repurpose
the existing code for reporting hashtable size in parallel workers
to help with this, making the patch pretty small.  This also ensures
that if rescans happen within parallel workers, we get the correct
maximums across all instances.

Konstantin Knizhnik and Tom Lane, per diagnosis by Thomas Munro
of a trouble report from Alvaro Herrera.

Discussion: https://postgr.es/m/20200323165059.GA24950@alvherre.pgsql
This commit is contained in:
Tom Lane 2020-04-11 12:39:19 -04:00
parent 5c27bce7f3
commit 969f9d0b4b
5 changed files with 87 additions and 49 deletions

View File

@ -2964,22 +2964,25 @@ show_hash_info(HashState *hashstate, ExplainState *es)
HashInstrumentation hinstrument = {0};
/*
* Collect stats from the local process, even when it's a parallel query.
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
* prepared to get instrumentation data from all participants.
*/
if (hashstate->hashtable)
ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
if (hashstate->hinstrument)
memcpy(&hinstrument, hashstate->hinstrument,
sizeof(HashInstrumentation));
/*
* Merge results from workers. In the parallel-oblivious case, the
* results from all participants should be identical, except where
* participants didn't run the join at all so have no data. In the
* parallel-aware case, we need to consider all the results. Each worker
* may have seen a different subset of batches and we want to find the
* highest memory usage for any one batch across all batches.
* may have seen a different subset of batches and we want to report the
* highest memory usage across all batches. We take the maxima of other
* values too, for the same reasons as in ExecHashAccumInstrumentation.
*/
if (hashstate->shared_info)
{
@ -2990,31 +2993,16 @@ show_hash_info(HashState *hashstate, ExplainState *es)
{
HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
if (worker_hi->nbatch > 0)
{
/*
* Every participant should agree on the buckets, so to be
* sure we have a value we'll just overwrite each time.
*/
hinstrument.nbuckets = worker_hi->nbuckets;
hinstrument.nbuckets_original = worker_hi->nbuckets_original;
/*
* Normally every participant should agree on the number of
* batches too, but it's possible for a backend that started
* late and missed the whole join not to have the final nbatch
* number. So we'll take the largest number.
*/
hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
hinstrument.nbatch_original = worker_hi->nbatch_original;
/*
* In a parallel-aware hash join, for now we report the
* maximum peak memory reported by any worker.
*/
hinstrument.space_peak =
Max(hinstrument.space_peak, worker_hi->space_peak);
}
hinstrument.nbuckets = Max(hinstrument.nbuckets,
worker_hi->nbuckets);
hinstrument.nbuckets_original = Max(hinstrument.nbuckets_original,
worker_hi->nbuckets_original);
hinstrument.nbatch = Max(hinstrument.nbatch,
worker_hi->nbatch);
hinstrument.nbatch_original = Max(hinstrument.nbatch_original,
worker_hi->nbatch_original);
hinstrument.space_peak = Max(hinstrument.space_peak,
worker_hi->space_peak);
}
}

View File

@ -2597,7 +2597,10 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
size = offsetof(SharedHashInfo, hinstrument) +
pcxt->nworkers * sizeof(HashInstrumentation);
node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
/* Each per-worker area must start out as zeroes. */
memset(node->shared_info, 0, size);
node->shared_info->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
node->shared_info);
@ -2616,22 +2619,33 @@ ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
if (!node->ps.instrument)
return;
/*
* Find our entry in the shared area, and set up a pointer to it so that
* we'll accumulate stats there when shutting down or rebuilding the hash
* table.
*/
shared_info = (SharedHashInfo *)
shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
}
/*
* Copy instrumentation data from this worker's hash table (if it built one)
* to DSM memory so the leader can retrieve it. This must be done in an
* ExecShutdownHash() rather than ExecEndHash() because the latter runs after
* we've detached from the DSM segment.
* Collect EXPLAIN stats if needed, saving them into DSM memory if
* ExecHashInitializeWorker was called, or local storage if not. In the
* parallel case, this must be done in ExecShutdownHash() rather than
* ExecEndHash() because the latter runs after we've detached from the DSM
* segment.
*/
void
ExecShutdownHash(HashState *node)
{
/* Allocate save space if EXPLAIN'ing and we didn't do so already */
if (node->ps.instrument && !node->hinstrument)
node->hinstrument = (HashInstrumentation *)
palloc0(sizeof(HashInstrumentation));
/* Now accumulate data for the current (final) hash table */
if (node->hinstrument && node->hashtable)
ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
ExecHashAccumInstrumentation(node->hinstrument, node->hashtable);
}
/*
@ -2655,18 +2669,34 @@ ExecHashRetrieveInstrumentation(HashState *node)
}
/*
* Copy the instrumentation data from 'hashtable' into a HashInstrumentation
* struct.
* Accumulate instrumentation data from 'hashtable' into an
* initially-zeroed HashInstrumentation struct.
*
* This is used to merge information across successive hash table instances
* within a single plan node. We take the maximum values of each interesting
* number. The largest nbuckets and largest nbatch values might have occurred
* in different instances, so there's some risk of confusion from reporting
* unrelated numbers; but there's a bigger risk of misdiagnosing a performance
* issue if we don't report the largest values. Similarly, we want to report
* the largest spacePeak regardless of whether it happened in the same
* instance as the largest nbuckets or nbatch. All the instances should have
* the same nbuckets_original and nbatch_original; but there's little value
* in depending on that here, so handle them the same way.
*/
void
ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable)
ExecHashAccumInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable)
{
instrument->nbuckets = hashtable->nbuckets;
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
instrument->space_peak = hashtable->spacePeak;
instrument->nbuckets = Max(instrument->nbuckets,
hashtable->nbuckets);
instrument->nbuckets_original = Max(instrument->nbuckets_original,
hashtable->nbuckets_original);
instrument->nbatch = Max(instrument->nbatch,
hashtable->nbatch);
instrument->nbatch_original = Max(instrument->nbatch_original,
hashtable->nbatch_original);
instrument->space_peak = Max(instrument->space_peak,
hashtable->spacePeak);
}
/*

View File

@ -1338,8 +1338,16 @@ ExecReScanHashJoin(HashJoinState *node)
/* must destroy and rebuild hash table */
HashState *hashNode = castNode(HashState, innerPlanState(node));
/* for safety, be sure to clear child plan node's pointer too */
Assert(hashNode->hashtable == node->hj_HashTable);
/* accumulate stats from old hash table, if wanted */
/* (this should match ExecShutdownHash) */
if (hashNode->ps.instrument && !hashNode->hinstrument)
hashNode->hinstrument = (HashInstrumentation *)
palloc0(sizeof(HashInstrumentation));
if (hashNode->hinstrument)
ExecHashAccumInstrumentation(hashNode->hinstrument,
hashNode->hashtable);
/* for safety, be sure to clear child plan node's pointer too */
hashNode->hashtable = NULL;
ExecHashTableDestroy(node->hj_HashTable);

View File

@ -73,7 +73,7 @@ extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable);
extern void ExecHashAccumInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable);
#endif /* NODEHASH_H */

View File

@ -2358,7 +2358,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of buckets */
int nbatch; /* number of batches at end of execution */
int nbatch_original; /* planned number of batches */
size_t space_peak; /* peak memory usage in bytes */
Size space_peak; /* peak memory usage in bytes */
} HashInstrumentation;
/* ----------------
@ -2381,8 +2381,20 @@ typedef struct HashState
HashJoinTable hashtable; /* hash table for the hashjoin */
List *hashkeys; /* list of ExprState nodes */
SharedHashInfo *shared_info; /* one entry per worker */
HashInstrumentation *hinstrument; /* this worker's entry */
/*
* In a parallelized hash join, the leader retains a pointer to the
* shared-memory stats area in its shared_info field, and then copies the
* shared-memory info back to local storage before DSM shutdown. The
* shared_info field remains NULL in workers, or in non-parallel joins.
*/
SharedHashInfo *shared_info;
/*
* If we are collecting hash stats, this points to an initially-zeroed
* collection area, which could be either local storage or in shared
* memory; either way it's for just one process.
*/
HashInstrumentation *hinstrument;
/* Parallel hash state. */
struct ParallelHashJoinState *parallel_state;