Revert "Fix race in Parallel Hash Join batch cleanup."

This reverts commit 8fa2478b40.

Discussion: https://postgr.es/m/CA%2BhUKGJmcqAE3MZeDCLLXa62cWM0AJbKmp2JrJYaJ86bz36LFA%40mail.gmail.com
This commit is contained in:
Thomas Munro 2021-03-18 01:06:01 +13:00
parent 8fa2478b40
commit b9ed85698f
3 changed files with 32 additions and 58 deletions

View File

@ -333,21 +333,14 @@ MultiExecParallelHash(HashState *node)
hashtable->nbuckets = pstate->nbuckets; hashtable->nbuckets = pstate->nbuckets;
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
hashtable->totalTuples = pstate->total_tuples; hashtable->totalTuples = pstate->total_tuples;
/*
* Unless we're completely done and the batch state has been freed, make
* sure we have accessors.
*/
if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
ExecParallelHashEnsureBatchAccessors(hashtable); ExecParallelHashEnsureBatchAccessors(hashtable);
/* /*
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
* case, which will bring the build phase to PHJ_BUILD_RUNNING (if it isn't * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
* there already). * there already).
*/ */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE); BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
} }
@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
/* /*
* The next Parallel Hash synchronization point is in * The next Parallel Hash synchronization point is in
* MultiExecParallelHash(), which will progress it all the way to * MultiExecParallelHash(), which will progress it all the way to
* PHJ_BUILD_RUNNING. The caller must not return control from this * PHJ_BUILD_DONE. The caller must not return control from this
* executor node between now and then. * executor node between now and then.
*/ */
} }
@ -3026,11 +3019,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
} }
/* /*
* We should never see a state where the batch-tracking array is freed, * It's possible for a backend to start up very late so that the whole
* because we should have given up sooner if we join when the build barrier * join is finished and the shm state for tracking batches has already
* has reached the PHJ_BUILD_DONE phase. * been freed by ExecHashTableDetach(). In that case we'll just leave
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
* up early.
*/ */
Assert(DsaPointerIsValid(pstate->batches)); if (!DsaPointerIsValid(pstate->batches))
return;
/* Use hash join memory context. */ /* Use hash join memory context. */
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@ -3150,17 +3146,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
void void
ExecHashTableDetach(HashJoinTable hashtable) ExecHashTableDetach(HashJoinTable hashtable)
{ {
ParallelHashJoinState *pstate = hashtable->parallel_state; if (hashtable->parallel_state)
/*
* If we're involved in a parallel query, we must either have got all the
* way to PHJ_BUILD_RUNNING, or joined too late and be in PHJ_BUILD_DONE.
*/
Assert(!pstate ||
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
{ {
ParallelHashJoinState *pstate = hashtable->parallel_state;
int i; int i;
/* Make sure any temporary files are closed. */ /* Make sure any temporary files are closed. */
@ -3176,23 +3164,18 @@ ExecHashTableDetach(HashJoinTable hashtable)
} }
/* If we're last to detach, clean up shared memory. */ /* If we're last to detach, clean up shared memory. */
if (BarrierArriveAndDetach(&pstate->build_barrier)) if (BarrierDetach(&pstate->build_barrier))
{ {
/*
* Late joining processes will see this state and give up
* immediately.
*/
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
if (DsaPointerIsValid(pstate->batches)) if (DsaPointerIsValid(pstate->batches))
{ {
dsa_free(hashtable->area, pstate->batches); dsa_free(hashtable->area, pstate->batches);
pstate->batches = InvalidDsaPointer; pstate->batches = InvalidDsaPointer;
} }
} }
}
hashtable->parallel_state = NULL; hashtable->parallel_state = NULL;
} }
}
/* /*
* Get the first tuple in a given bucket identified by number. * Get the first tuple in a given bucket identified by number.

View File

@ -45,8 +45,7 @@
* PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
* PHJ_BUILD_HASHING_INNER -- all hash the inner rel * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
* PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
* PHJ_BUILD_RUNNING -- building done, probing can begin * PHJ_BUILD_DONE -- building done, probing can begin
* PHJ_BUILD_DONE -- all work complete, one frees batches
* *
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
* be used repeatedly as required to coordinate expansions in the number of * be used repeatedly as required to coordinate expansions in the number of
@ -74,7 +73,7 @@
* batches whenever it encounters them while scanning and probing, which it * batches whenever it encounters them while scanning and probing, which it
* can do because it processes batches in serial order. * can do because it processes batches in serial order.
* *
* Once PHJ_BUILD_RUNNING is reached, backends then split up and process * Once PHJ_BUILD_DONE is reached, backends then split up and process
* different batches, or gang up and work together on probing batches if there * different batches, or gang up and work together on probing batches if there
* aren't enough to go around. For each batch there is a separate barrier * aren't enough to go around. For each batch there is a separate barrier
* with the following phases: * with the following phases:
@ -96,16 +95,11 @@
* *
* To avoid deadlocks, we never wait for any barrier unless it is known that * To avoid deadlocks, we never wait for any barrier unless it is known that
* all other backends attached to it are actively executing the node or have * all other backends attached to it are actively executing the node or have
* finished. Practically, that means that we never emit a tuple while attached * already arrived. Practically, that means that we never return a tuple
* to a barrier, unless the barrier has reached a phase that means that no * while attached to a barrier, unless the barrier has reached its final
* process will wait on it again. We emit tuples while attached to the build * state. In the slightly special case of the per-batch barrier, we return
* barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
* PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
* respectively without waiting, using BarrierArriveAndDetach(). The last to
* detach receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -323,7 +317,6 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
build_barrier = &parallel_state->build_barrier; build_barrier = &parallel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE); BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
{ {
@ -336,18 +329,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
BarrierArriveAndWait(build_barrier, BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASHING_OUTER); WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
} }
else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE) Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
{
/*
* If we attached so late that the job is finished and
* the batch state has been freed, we can return
* immediately.
*/
return NULL;
}
/* Each backend should now select a batch to work on. */ /* Each backend should now select a batch to work on. */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
hashtable->curbatch = -1; hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH; node->hj_JoinState = HJ_NEED_NEW_BATCH;
@ -1106,6 +1090,14 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
int start_batchno; int start_batchno;
int batchno; int batchno;
/*
* If we started up so late that the batch tracking array has been freed
* already by ExecHashTableDetach(), then we are finished. See also
* ExecParallelHashEnsureBatchAccessors().
*/
if (hashtable->batches == NULL)
return false;
/* /*
* If we were already attached to a batch, remember not to bother checking * If we were already attached to a batch, remember not to bother checking
* it again, and detach from it (possibly freeing the hash table if we are * it again, and detach from it (possibly freeing the hash table if we are

View File

@ -258,8 +258,7 @@ typedef struct ParallelHashJoinState
#define PHJ_BUILD_ALLOCATING 1 #define PHJ_BUILD_ALLOCATING 1
#define PHJ_BUILD_HASHING_INNER 2 #define PHJ_BUILD_HASHING_INNER 2
#define PHJ_BUILD_HASHING_OUTER 3 #define PHJ_BUILD_HASHING_OUTER 3
#define PHJ_BUILD_RUNNING 4 #define PHJ_BUILD_DONE 4
#define PHJ_BUILD_DONE 5
/* The phases for probing each batch, used by for batch_barrier. */ /* The phases for probing each batch, used by for batch_barrier. */
#define PHJ_BATCH_ELECTING 0 #define PHJ_BATCH_ELECTING 0