mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-09-30 15:21:20 +02:00
97c39498e5
Backpatch-through: certain files through 9.4
312 lines
10 KiB
C
312 lines
10 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* barrier.c
|
|
* Barriers for synchronizing cooperating processes.
|
|
*
|
|
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* From Wikipedia[1]: "In parallel computing, a barrier is a type of
|
|
* synchronization method. A barrier for a group of threads or processes in
|
|
* the source code means any thread/process must stop at this point and cannot
|
|
* proceed until all other threads/processes reach this barrier."
|
|
*
|
|
* This implementation of barriers allows for static sets of participants
|
|
* known up front, or dynamic sets of participants which processes can join or
|
|
* leave at any time. In the dynamic case, a phase number can be used to
|
|
* track progress through a parallel algorithm, and may be necessary to
|
|
* synchronize with the current phase of a multi-phase algorithm when a new
|
|
* participant joins. In the static case, the phase number is used
|
|
* internally, but it isn't strictly necessary for client code to access it
|
|
* because the phase can only advance when the declared number of participants
|
|
* reaches the barrier, so client code should be in no doubt about the current
|
|
* phase of computation at all times.
|
|
*
|
|
* Consider a parallel algorithm that involves separate phases of computation
|
|
* A, B and C where the output of each phase is needed before the next phase
|
|
* can begin.
|
|
*
|
|
* In the case of a static barrier initialized with 4 participants, each
|
|
* participant works on phase A, then calls BarrierArriveAndWait to wait until
|
|
* all 4 participants have reached that point. When BarrierArriveAndWait
|
|
* returns control, each participant can work on B, and so on. Because the
|
|
* barrier knows how many participants to expect, the phases of computation
|
|
* don't need labels or numbers, since each process's program counter implies
|
|
* the current phase. Even if some of the processes are slow to start up and
|
|
* begin running phase A, the other participants are expecting them and will
|
|
* patiently wait at the barrier. The code could be written as follows:
|
|
*
|
|
* perform_a();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
* perform_b();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
* perform_c();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
*
|
|
* If the number of participants is not known up front, then a dynamic barrier
|
|
* is needed and the number should be set to zero at initialization. New
|
|
* complications arise because the number necessarily changes over time as
|
|
* participants attach and detach, and therefore phases B, C or even the end
|
|
* of processing may be reached before any given participant has started
|
|
* running and attached. Therefore the client code must perform an initial
|
|
* test of the phase number after attaching, because it needs to find out
|
|
* which phase of the algorithm has been reached by any participants that are
|
|
* already attached in order to synchronize with that work. Once the program
|
|
* counter or some other representation of current progress is synchronized
|
|
* with the barrier's phase, normal control flow can be used just as in the
|
|
* static case. Our example could be written using a switch statement with
|
|
* cases that fall-through, as follows:
|
|
*
|
|
* phase = BarrierAttach(&barrier);
|
|
* switch (phase)
|
|
* {
|
|
* case PHASE_A:
|
|
* perform_a();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
* case PHASE_B:
|
|
* perform_b();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
* case PHASE_C:
|
|
* perform_c();
|
|
* BarrierArriveAndWait(&barrier, ...);
|
|
* }
|
|
* BarrierDetach(&barrier);
|
|
*
|
|
* Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic
|
|
* barriers behave similarly to Java's java.util.concurrent.Phaser.
|
|
*
|
|
* [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/storage/ipc/barrier.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
#include "storage/barrier.h"
|
|
|
|
static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
|
|
|
|
/*
|
|
* Initialize this barrier. To use a static party size, provide the number of
|
|
* participants to wait for at each phase indicating that that number of
|
|
* backends is implicitly attached. To use a dynamic party size, specify zero
|
|
* here and then use BarrierAttach() and
|
|
* BarrierDetach()/BarrierArriveAndDetach() to register and deregister
|
|
* participants explicitly.
|
|
*/
|
|
void
|
|
BarrierInit(Barrier *barrier, int participants)
|
|
{
|
|
SpinLockInit(&barrier->mutex);
|
|
barrier->participants = participants;
|
|
barrier->arrived = 0;
|
|
barrier->phase = 0;
|
|
barrier->elected = 0;
|
|
barrier->static_party = participants > 0;
|
|
ConditionVariableInit(&barrier->condition_variable);
|
|
}
|
|
|
|
/*
|
|
* Arrive at this barrier, wait for all other attached participants to arrive
|
|
* too and then return. Increments the current phase. The caller must be
|
|
* attached.
|
|
*
|
|
* While waiting, pg_stat_activity shows a wait_event_class and wait_event
|
|
* controlled by the wait_event_info passed in, which should be a value from
|
|
* one of the WaitEventXXX enums defined in pgstat.h.
|
|
*
|
|
* Return true in one arbitrarily chosen participant. Return false in all
|
|
* others. The return code can be used to elect one participant to execute a
|
|
* phase of work that must be done serially while other participants wait.
|
|
*/
|
|
bool
|
|
BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
|
|
{
|
|
bool release = false;
|
|
bool elected;
|
|
int start_phase;
|
|
int next_phase;
|
|
|
|
SpinLockAcquire(&barrier->mutex);
|
|
start_phase = barrier->phase;
|
|
next_phase = start_phase + 1;
|
|
++barrier->arrived;
|
|
if (barrier->arrived == barrier->participants)
|
|
{
|
|
release = true;
|
|
barrier->arrived = 0;
|
|
barrier->phase = next_phase;
|
|
barrier->elected = next_phase;
|
|
}
|
|
SpinLockRelease(&barrier->mutex);
|
|
|
|
/*
|
|
* If we were the last expected participant to arrive, we can release our
|
|
* peers and return true to indicate that this backend has been elected to
|
|
* perform any serial work.
|
|
*/
|
|
if (release)
|
|
{
|
|
ConditionVariableBroadcast(&barrier->condition_variable);
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Otherwise we have to wait for the last participant to arrive and
|
|
* advance the phase.
|
|
*/
|
|
elected = false;
|
|
ConditionVariablePrepareToSleep(&barrier->condition_variable);
|
|
for (;;)
|
|
{
|
|
/*
|
|
* We know that phase must either be start_phase, indicating that we
|
|
* need to keep waiting, or next_phase, indicating that the last
|
|
* participant that we were waiting for has either arrived or detached
|
|
* so that the next phase has begun. The phase cannot advance any
|
|
* further than that without this backend's participation, because
|
|
* this backend is attached.
|
|
*/
|
|
SpinLockAcquire(&barrier->mutex);
|
|
Assert(barrier->phase == start_phase || barrier->phase == next_phase);
|
|
release = barrier->phase == next_phase;
|
|
if (release && barrier->elected != next_phase)
|
|
{
|
|
/*
|
|
* Usually the backend that arrives last and releases the other
|
|
* backends is elected to return true (see above), so that it can
|
|
* begin processing serial work while it has a CPU timeslice.
|
|
* However, if the barrier advanced because someone detached, then
|
|
* one of the backends that is awoken will need to be elected.
|
|
*/
|
|
barrier->elected = barrier->phase;
|
|
elected = true;
|
|
}
|
|
SpinLockRelease(&barrier->mutex);
|
|
if (release)
|
|
break;
|
|
ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
|
|
}
|
|
ConditionVariableCancelSleep();
|
|
|
|
return elected;
|
|
}
|
|
|
|
/*
|
|
* Arrive at this barrier, but detach rather than waiting. Returns true if
|
|
* the caller was the last to detach.
|
|
*/
|
|
bool
|
|
BarrierArriveAndDetach(Barrier *barrier)
|
|
{
|
|
return BarrierDetachImpl(barrier, true);
|
|
}
|
|
|
|
/*
|
|
* Attach to a barrier. All waiting participants will now wait for this
|
|
* participant to call BarrierArriveAndWait(), BarrierDetach() or
|
|
* BarrierArriveAndDetach(). Return the current phase.
|
|
*/
|
|
int
|
|
BarrierAttach(Barrier *barrier)
|
|
{
|
|
int phase;
|
|
|
|
Assert(!barrier->static_party);
|
|
|
|
SpinLockAcquire(&barrier->mutex);
|
|
++barrier->participants;
|
|
phase = barrier->phase;
|
|
SpinLockRelease(&barrier->mutex);
|
|
|
|
return phase;
|
|
}
|
|
|
|
/*
|
|
* Detach from a barrier. This may release other waiters from BarrierWait and
|
|
* advance the phase if they were only waiting for this backend. Return true
|
|
* if this participant was the last to detach.
|
|
*/
|
|
bool
|
|
BarrierDetach(Barrier *barrier)
|
|
{
|
|
return BarrierDetachImpl(barrier, false);
|
|
}
|
|
|
|
/*
|
|
* Return the current phase of a barrier. The caller must be attached.
|
|
*/
|
|
int
|
|
BarrierPhase(Barrier *barrier)
|
|
{
|
|
/*
|
|
* It is OK to read barrier->phase without locking, because it can't
|
|
* change without us (we are attached to it), and we executed a memory
|
|
* barrier when we either attached or participated in changing it last
|
|
* time.
|
|
*/
|
|
return barrier->phase;
|
|
}
|
|
|
|
/*
|
|
* Return an instantaneous snapshot of the number of participants currently
|
|
* attached to this barrier. For debugging purposes only.
|
|
*/
|
|
int
|
|
BarrierParticipants(Barrier *barrier)
|
|
{
|
|
int participants;
|
|
|
|
SpinLockAcquire(&barrier->mutex);
|
|
participants = barrier->participants;
|
|
SpinLockRelease(&barrier->mutex);
|
|
|
|
return participants;
|
|
}
|
|
|
|
/*
|
|
* Detach from a barrier. If 'arrive' is true then also increment the phase
|
|
* if there are no other participants. If there are other participants
|
|
* waiting, then the phase will be advanced and they'll be released if they
|
|
* were only waiting for the caller. Return true if this participant was the
|
|
* last to detach.
|
|
*/
|
|
static inline bool
|
|
BarrierDetachImpl(Barrier *barrier, bool arrive)
|
|
{
|
|
bool release;
|
|
bool last;
|
|
|
|
Assert(!barrier->static_party);
|
|
|
|
SpinLockAcquire(&barrier->mutex);
|
|
Assert(barrier->participants > 0);
|
|
--barrier->participants;
|
|
|
|
/*
|
|
* If any other participants are waiting and we were the last participant
|
|
* waited for, release them. If no other participants are waiting, but
|
|
* this is a BarrierArriveAndDetach() call, then advance the phase too.
|
|
*/
|
|
if ((arrive || barrier->participants > 0) &&
|
|
barrier->arrived == barrier->participants)
|
|
{
|
|
release = true;
|
|
barrier->arrived = 0;
|
|
++barrier->phase;
|
|
}
|
|
else
|
|
release = false;
|
|
|
|
last = barrier->participants == 0;
|
|
SpinLockRelease(&barrier->mutex);
|
|
|
|
if (release)
|
|
ConditionVariableBroadcast(&barrier->condition_variable);
|
|
|
|
return last;
|
|
}
|