diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 07030a2ef0..3058ce921b 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -1294,15 +1294,21 @@ throttle(size_t increment) /* Only sleep if the transfer is faster than it should be. */ if (sleep > 0) { - ResetLatch(&MyWalSnd->latch); + ResetLatch(MyLatch); + + /* We're eating a potentially set latch, so check for interrupts */ + CHECK_FOR_INTERRUPTS(); /* * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be * the maximum time to sleep. Thus the cast to long is safe. */ - wait_result = WaitLatch(&MyWalSnd->latch, + wait_result = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); + + if (wait_result & WL_LATCH_SET) + CHECK_FOR_INTERRUPTS(); } else { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 86c36bf502..05d2339b15 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); @@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } /* reactivate latch so WalSndLoop knows to continue */ - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); } /* @@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc) /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); @@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } /* reactivate latch so WalSndLoop knows to continue */ - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); return RecentFlushPtr; } @@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data) if (!PostmasterIsAlive()) exit(1); + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data) SyncRepInitConfig(); } - CHECK_FOR_INTERRUPTS(); - /* Check for input from the client */ ProcessRepliesIfAny(); - /* Clear any already-pending wakeups */ - ResetLatch(&MyWalSnd->latch); - /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit @@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - ImmediateInterruptOK = true; - CHECK_FOR_INTERRUPTS(); - WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, + WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime); - ImmediateInterruptOK = false; } } return; @@ -1959,9 +1954,9 @@ InitWalSenderSlot(void) walsnd->pid = MyProcPid; walsnd->sentPtr = InvalidXLogRecPtr; walsnd->state = WALSNDSTATE_STARTUP; + walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ - OwnLatch((Latch *) &walsnd->latch); MyWalSnd = (WalSnd *) walsnd; break; @@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg) Assert(walsnd != NULL); - /* - * Clear MyWalSnd first; then disown the latch. This is so that signal - * handlers won't try to touch the latch after it's no longer ours. - */ MyWalSnd = NULL; - DisownLatch(&walsnd->latch); - - /* - * Mark WalSnd struct no longer in use. Assume that no lock is required - * for this. - */ + SpinLockAcquire(&walsnd->mutex); + /* clear latch while holding the spinlock, so it can safely be read */ + walsnd->latch = NULL; + /* Mark WalSnd struct as no longer being in use. */ walsnd->pid = 0; + SpinLockRelease(&walsnd->mutex); } /* @@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS) int save_errno = errno; got_SIGHUP = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); + + SetLatch(MyLatch); errno = save_errno; } @@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS) kill(MyProcPid, SIGTERM); walsender_ready_to_stop = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); + SetLatch(MyLatch); errno = save_errno; } @@ -2668,7 +2657,6 @@ WalSndShmemInit(void) WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); - InitSharedLatch(&walsnd->latch); } } } @@ -2685,7 +2673,21 @@ WalSndWakeup(void) int i; for (i = 0; i < max_wal_senders; i++) - SetLatch(&WalSndCtl->walsnds[i].latch); + { + Latch *latch; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Get latch pointer with spinlock held, for the unlikely case that + * pointer reads aren't atomic (as they're 8 bytes). + */ + SpinLockAcquire(&walsnd->mutex); + latch = walsnd->latch; + SpinLockRelease(&walsnd->mutex); + + if (latch != NULL) + SetLatch(latch); + } } /* Set state for current walsender (only called in walsender) */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index cc351d6f67..88677506f3 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -51,10 +51,10 @@ typedef struct WalSnd slock_t mutex; /* - * Latch used by backends to wake up this walsender when it has work to - * do. + * Pointer to the walsender's latch. Used by backends to wake up this + * walsender when it has work to do. NULL if the walsender isn't active. */ - Latch latch; + Latch *latch; /* * The priority order of the standby managed by this WALSender, as listed