Support condition variables.

Condition variables provide a flexible way to sleep until a
cooperating process causes an arbitrary condition to become true.  In
simple cases, this can be accomplished with a WaitLatch/ResetLatch
loop; the cooperating process can call SetLatch after performing work
that might cause the condition to be satisfied, and the waiting
process can recheck the condition each time.  However, if the process
performing the work doesn't have an easy way to identify which
processes might be waiting, this doesn't work, because it can't
identify which latches to set.  Condition variables solve that problem
by internally maintaining a list of waiters; a process that may have
caused some waiter's condition to be satisfied must "signal" or
"broadcast" on the condition variable.

Robert Haas and Thomas Munro
This commit is contained in:
Robert Haas 2016-11-22 14:26:40 -05:00
parent 1c7861e81b
commit e8ac886c24
12 changed files with 364 additions and 2 deletions

View File

@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@ -2472,6 +2473,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
/* Cancel condition variable sleep */
ConditionVariableCancelSleep();
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.

View File

@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@ -536,6 +537,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
}

View File

@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@ -187,6 +188,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */

View File

@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@ -271,6 +272,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();

View File

@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@ -167,6 +168,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();

View File

@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)

View File

@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
s_lock.o predicate.o
s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,225 @@
/*-------------------------------------------------------------------------
*
* condition_variable.c
* Implementation of condition variables. Condition variables provide
* a way for one process to wait until a specific condition occurs,
* without needing to know the specific identity of the process for
* which they are waiting. Waits for condition variables can be
* interrupted, unlike LWLock waits. Condition variables are safe
* to use within dynamic shared memory segments.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/backend/storage/lmgr/condition_variable.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/proclist.h"
#include "storage/spin.h"
#include "utils/memutils.h"
/* Initially, we are not prepared to sleep on any condition variable. */
static ConditionVariable *cv_sleep_target = NULL;
/* Reusable WaitEventSet. */
static WaitEventSet *cv_wait_event_set = NULL;
/*
* Initialize a condition variable.
*/
void
ConditionVariableInit(ConditionVariable *cv)
{
SpinLockInit(&cv->mutex);
proclist_init(&cv->wakeup);
}
/*
* Prepare to wait on a given condition variable. This can optionally be
* called before entering a test/sleep loop. Alternatively, the call to
* ConditionVariablePrepareToSleep can be omitted. The only advantage of
* calling ConditionVariablePrepareToSleep is that it avoids an initial
* double-test of the user's predicate in the case that we need to wait.
*/
void
ConditionVariablePrepareToSleep(ConditionVariable *cv)
{
int pgprocno = MyProc->pgprocno;
/*
* It's not legal to prepare a sleep until the previous sleep has been
* completed or canceled.
*/
Assert(cv_sleep_target == NULL);
/* Record the condition variable on which we will sleep. */
cv_sleep_target = cv;
/* Create a reusable WaitEventSet. */
if (cv_wait_event_set == NULL)
{
cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
&MyProc->procLatch, NULL);
}
/* Add myself to the wait queue. */
SpinLockAcquire(&cv->mutex);
if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
SpinLockRelease(&cv->mutex);
/* Reset my latch before entering the caller's predicate loop. */
ResetLatch(&MyProc->procLatch);
}
/*--------------------------------------------------------------------------
* Wait for the given condition variable to be signaled. This should be
* called in a predicate loop that tests for a specfic exit condition and
* otherwise sleeps, like so:
*
* ConditionVariablePrepareToSleep(cv); [optional]
* while (condition for which we are waiting is not true)
* ConditionVariableSleep(cv, wait_event_info);
* ConditionVariableCancelSleep();
*
* Supply a value from one of the WaitEventXXX enums defined in pgstat.h to
* control the contents of pg_stat_activity's wait_event_type and wait_event
* columns while waiting.
*-------------------------------------------------------------------------*/
void
ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
{
WaitEvent event;
bool done = false;
/*
* If the caller didn't prepare to sleep explicitly, then do so now and
* return immediately. The caller's predicate loop should immediately
* call again if its exit condition is not yet met. This initial spurious
* return can be avoided by calling ConditionVariablePrepareToSleep(cv)
* first. Whether it's worth doing that depends on whether you expect the
* condition to be met initially, in which case skipping the prepare
* allows you to skip manipulation of the wait list, or not met intiailly,
* in which case preparing first allows you to skip a spurious test of the
* caller's exit condition.
*/
if (cv_sleep_target == NULL)
{
ConditionVariablePrepareToSleep(cv);
return;
}
/* Any earlier condition variable sleep must have been canceled. */
Assert(cv_sleep_target == cv);
while (!done)
{
CHECK_FOR_INTERRUPTS();
/*
* Wait for latch to be set. We don't care about the result because
* our contract permits spurious returns.
*/
WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
/* Reset latch before testing whether we can return. */
ResetLatch(&MyProc->procLatch);
/*
* If this process has been taken out of the wait list, then we know
* that is has been signaled by ConditionVariableSignal. We put it
* back into the wait list, so we don't miss any further signals while
* the caller's loop checks its condition. If it hasn't been taken
* out of the wait list, then the latch must have been set by
* something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid these obvious
* cases.
*/
SpinLockAcquire(&cv->mutex);
if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
{
done = true;
proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
}
SpinLockRelease(&cv->mutex);
}
}
/*
* Cancel any pending sleep operation. We just need to remove ourselves
* from the wait queue of any condition variable for which we have previously
* prepared a sleep.
*/
void
ConditionVariableCancelSleep(void)
{
ConditionVariable *cv = cv_sleep_target;
if (cv == NULL)
return;
SpinLockAcquire(&cv->mutex);
if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
SpinLockRelease(&cv->mutex);
cv_sleep_target = NULL;
}
/*
* Wake up one sleeping process, assuming there is at least one.
*
* The return value indicates whether or not we woke somebody up.
*/
bool
ConditionVariableSignal(ConditionVariable *cv)
{
PGPROC *proc = NULL;
/* Remove the first process from the wakeup queue (if any). */
SpinLockAcquire(&cv->mutex);
if (!proclist_is_empty(&cv->wakeup))
proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
SpinLockRelease(&cv->mutex);
/* If we found someone sleeping, set their latch to wake them up. */
if (proc != NULL)
{
SetLatch(&proc->procLatch);
return true;
}
/* No sleeping processes. */
return false;
}
/*
* Wake up all sleeping processes.
*
* The return value indicates the number of processes we woke.
*/
int
ConditionVariableBroadcast(ConditionVariable *cv)
{
int nwoken = 0;
/*
* Let's just do this the dumbest way possible. We could try to dequeue
* all the sleepers at once to save spinlock cycles, but it's a bit hard
* to get that right in the face of possible sleep cancelations, and
* we don't want to loop holding the mutex.
*/
while (ConditionVariableSignal(cv))
++nwoken;
return nwoken;
}

View File

@ -43,6 +43,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@ -802,6 +803,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@ -907,6 +911,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch

View File

@ -0,0 +1,59 @@
/*-------------------------------------------------------------------------
*
* condition_variable.h
* Condition variables
*
* A condition variable is a method of waiting until a certain condition
* becomes true. Conventionally, a condition variable supports three
* operations: (1) sleep; (2) signal, which wakes up one process sleeping
* on the condition variable; and (3) broadcast, which wakes up every
* process sleeping on the condition variable. In our implementation,
* condition variables put a process into an interruptible sleep (so it
* can be cancelled prior to the fulfillment of the condition) and do not
* use pointers internally (so that they are safe to use within DSMs).
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/condition_variable.h
*
*-------------------------------------------------------------------------
*/
#ifndef CONDITION_VARIABLE_H
#define CONDITION_VARIABLE_H
#include "storage/s_lock.h"
#include "storage/proclist_types.h"
typedef struct
{
slock_t mutex;
proclist_head wakeup;
} ConditionVariable;
/* Initialize a condition variable. */
extern void ConditionVariableInit(ConditionVariable *);
/*
* To sleep on a condition variable, a process should use a loop which first
* checks the condition, exiting the loop if it is met, and then calls
* ConditionVariableSleep. Spurious wakeups are possible, but should be
* infrequent. After exiting the loop, ConditionVariableCancelSleep should
* be called to ensure that the process is no longer in the wait list for
* the condition variable.
*/
extern void ConditionVariableSleep(ConditionVariable *, uint32 wait_event_info);
extern void ConditionVariableCancelSleep(void);
/*
* The use of this function is optional and not necessary for correctness;
* for efficiency, it should be called prior entering the loop described above
* if it is thought that the condition is unlikely to hold immediately.
*/
extern void ConditionVariablePrepareToSleep(ConditionVariable *);
/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
extern bool ConditionVariableSignal(ConditionVariable *);
extern int ConditionVariableBroadcast(ConditionVariable *);
#endif /* CONDITION_VARIABLE_H */

View File

@ -115,6 +115,9 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
/* Support for condition variables. */
proclist_node cvWaitLink; /* position in CV wait list */
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */

View File

@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
Assert(list->head != procno);
Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
* Insert a node a the end of a list.
* Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
Assert(list->head != procno);
Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
node->next = node->prev = INVALID_PGPROCNO;
}
/*
* Check if a node is currently in a list. It must be known that the node is
* not in any _other_ proclist that uses the same proclist_node, so that the
* only possibilities are that it is in this list or none.
*/
static inline bool
proclist_contains_offset(proclist_head *list, int procno,
size_t node_offset)
{
proclist_node *node = proclist_node_get(procno, node_offset);
/*
* If this is not a member of a proclist, then the next and prev pointers
* should be 0. Circular lists are not allowed so this condition is not
* confusable with a real pgprocno 0.
*/
if (node->prev == 0 && node->next == 0)
return false;
/* If there is a previous node, then this node must be in the list. */
if (node->prev != INVALID_PGPROCNO)
return true;
/*
* There is no previous node, so the only way this node can be in the list
* is if it's the head node.
*/
return list->head == procno;
}
/*
* Remove and return the first node from a list (there must be one).
*/
static inline PGPROC *
proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
{
PGPROC *proc;
Assert(!proclist_is_empty(list));
proc = GetPGProcByNumber(list->head);
proclist_delete_offset(list, list->head, node_offset);
return proc;
}
/*
@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_pop_head_node(list, link_member) \
proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
#define proclist_contains(list, procno, link_member) \
proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current