diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 45b8b3684f..d3a136b6f5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3309,6 +3309,9 @@ WalSndShmemInit(void) SpinLockInit(&walsnd->mutex); } + + ConditionVariableInit(&WalSndCtl->wal_flush_cv); + ConditionVariableInit(&WalSndCtl->wal_replay_cv); } } @@ -3330,31 +3333,17 @@ WalSndShmemInit(void) void WalSndWakeup(bool physical, bool logical) { - int i; + /* + * Wake up all the walsenders waiting on WAL being flushed or replayed + * respectively. Note that waiting walsender would have prepared to sleep + * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait() + * before actually waiting. + */ + if (physical) + ConditionVariableBroadcast(&WalSndCtl->wal_flush_cv); - for (i = 0; i < max_wal_senders; i++) - { - Latch *latch; - ReplicationKind kind; - WalSnd *walsnd = &WalSndCtl->walsnds[i]; - - /* - * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). While at it, also - * get kind. - */ - SpinLockAcquire(&walsnd->mutex); - latch = walsnd->latch; - kind = walsnd->kind; - SpinLockRelease(&walsnd->mutex); - - if (latch == NULL) - continue; - - if ((physical && kind == REPLICATION_KIND_PHYSICAL) || - (logical && kind == REPLICATION_KIND_LOGICAL)) - SetLatch(latch); - } + if (logical) + ConditionVariableBroadcast(&WalSndCtl->wal_replay_cv); } /* @@ -3368,9 +3357,44 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) WaitEvent event; ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL); + + /* + * We use a condition variable to efficiently wake up walsenders in + * WalSndWakeup(). + * + * Every walsender prepares to sleep on a shared memory CV. Note that it + * just prepares to sleep on the CV (i.e., adds itself to the CV's + * waitlist), but does not actually wait on the CV (IOW, it never calls + * ConditionVariableSleep()). It still uses WaitEventSetWait() for + * waiting, because we also need to wait for socket events. The processes + * (startup process, walreceiver etc.) wanting to wake up walsenders use + * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping + * walsenders come out of WaitEventSetWait(). + * + * This approach is simple and efficient because, one doesn't have to loop + * through all the walsenders slots, with a spinlock acquisition and + * release for every iteration, just to wake up only the waiting + * walsenders. It makes WalSndWakeup() callers' life easy. + * + * XXX: A desirable future improvement would be to add support for CVs + * into WaitEventSetWait(). + * + * And, we use separate shared memory CVs for physical and logical + * walsenders for selective wake ups, see WalSndWakeup() for more details. + */ + if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); + if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 && (event.events & WL_POSTMASTER_DEATH)) + { + ConditionVariableCancelSleep(); proc_exit(1); + } + + ConditionVariableCancelSleep(); } /* diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index ff25aa70a8..7d919583bd 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -17,6 +17,7 @@ #include "nodes/nodes.h" #include "nodes/replnodes.h" #include "replication/syncrep.h" +#include "storage/condition_variable.h" #include "storage/latch.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -108,6 +109,10 @@ typedef struct */ bool sync_standbys_defined; + /* used as a registry of physical / logical walsenders to wake */ + ConditionVariable wal_flush_cv; + ConditionVariable wal_replay_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData;