Fix EXPLAIN ANALYZE output for Parallel Hash.

In a race case, EXPLAIN ANALYZE could fail to display correct nbatch
and size information.  Refactor so that participants report only on
batches they worked on rather than trying to report on all of them,
and teach explain.c to consider the HashInstrumentation object from
all participants instead of picking the first one it can find.  This
should fix an occasional build farm failure in the "join" regression
test.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/30219.1514428346%40sss.pgh.pa.us
This commit is contained in:
Andres Freund 2018-01-01 14:38:23 -08:00
parent 6078770c1a
commit 93ea78b17c
4 changed files with 62 additions and 51 deletions

View File

@ -2379,62 +2379,87 @@ show_sort_info(SortState *sortstate, ExplainState *es)
static void
show_hash_info(HashState *hashstate, ExplainState *es)
{
HashInstrumentation *hinstrument = NULL;
HashInstrumentation hinstrument = {0};
/*
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
* prepared to get instrumentation data from a worker if there is no hash
* table.
* prepared to get instrumentation data from all participants.
*/
if (hashstate->hashtable)
{
hinstrument = (HashInstrumentation *)
palloc(sizeof(HashInstrumentation));
ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
}
else if (hashstate->shared_info)
ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
/*
* Merge results from workers. In the parallel-oblivious case, the
* results from all participants should be identical, except where
* participants didn't run the join at all so have no data. In the
* parallel-aware case, we need to consider all the results. Each worker
* may have seen a different subset of batches and we want to find the
* highest memory usage for any one batch across all batches.
*/
if (hashstate->shared_info)
{
SharedHashInfo *shared_info = hashstate->shared_info;
int i;
int i;
/* Find the first worker that built a hash table. */
for (i = 0; i < shared_info->num_workers; ++i)
{
if (shared_info->hinstrument[i].nbatch > 0)
HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
if (worker_hi->nbatch > 0)
{
hinstrument = &shared_info->hinstrument[i];
break;
/*
* Every participant should agree on the buckets, so to be
* sure we have a value we'll just overwrite each time.
*/
hinstrument.nbuckets = worker_hi->nbuckets;
hinstrument.nbuckets_original = worker_hi->nbuckets_original;
/*
* Normally every participant should agree on the number of
* batches too, but it's possible for a backend that started
* late and missed the whole join not to have the final nbatch
* number. So we'll take the largest number.
*/
hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
hinstrument.nbatch_original = worker_hi->nbatch_original;
/*
* In a parallel-aware hash join, for now we report the
* maximum peak memory reported by any worker.
*/
hinstrument.space_peak =
Max(hinstrument.space_peak, worker_hi->space_peak);
}
}
}
if (hinstrument)
if (hinstrument.nbatch > 0)
{
long spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
long spacePeakKb = (hinstrument.space_peak + 1023) / 1024;
if (es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
ExplainPropertyLong("Hash Buckets", hinstrument.nbuckets, es);
ExplainPropertyLong("Original Hash Buckets",
hinstrument->nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
hinstrument.nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hinstrument.nbatch, es);
ExplainPropertyLong("Original Hash Batches",
hinstrument->nbatch_original, es);
hinstrument.nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
else if (hinstrument->nbatch_original != hinstrument->nbatch ||
hinstrument->nbuckets_original != hinstrument->nbuckets)
else if (hinstrument.nbatch_original != hinstrument.nbatch ||
hinstrument.nbuckets_original != hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
hinstrument->nbuckets,
hinstrument->nbuckets_original,
hinstrument->nbatch,
hinstrument->nbatch_original,
hinstrument.nbuckets,
hinstrument.nbuckets_original,
hinstrument.nbatch,
hinstrument.nbatch_original,
spacePeakKb);
}
else
@ -2442,7 +2467,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d Batches: %d Memory Usage: %ldkB\n",
hinstrument->nbuckets, hinstrument->nbatch,
hinstrument.nbuckets, hinstrument.nbatch,
spacePeakKb);
}
}

View File

@ -3090,7 +3090,16 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
batch->buckets = InvalidDsaPointer;
}
}
ExecParallelHashUpdateSpacePeak(hashtable, curbatch);
/*
* Track the largest batch we've been attached to. Though each
* backend might see a different subset of batches, explain.c will
* scan the results from all backends to find the largest value.
*/
hashtable->spacePeak =
Max(hashtable->spacePeak,
batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
/* Remember that we are not attached to a batch. */
hashtable->curbatch = -1;
}
@ -3295,19 +3304,3 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
return true;
}
/*
* Update this backend's copy of hashtable->spacePeak to account for a given
* batch. This is called at the end of hashing for batch 0, and then for each
* batch when it is done or discovered to be already done. The result is used
* for EXPLAIN output.
*/
void
ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno)
{
size_t size;
size = hashtable->batches[batchno].shared->size;
size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets;
hashtable->spacePeak = Max(hashtable->spacePeak, size);
}

View File

@ -1186,12 +1186,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* remain).
*/
BarrierDetach(batch_barrier);
/*
* We didn't work on this batch, but we need to observe
* its size for EXPLAIN.
*/
ExecParallelHashUpdateSpacePeak(hashtable, batchno);
hashtable->batches[batchno].done = true;
hashtable->curbatch = -1;
break;

View File

@ -33,7 +33,6 @@ extern void ExecHashTableDetach(HashJoinTable hashtable);
extern void ExecHashTableDetachBatch(HashJoinTable hashtable);
extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,
int batchno);
void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno);
extern void ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,