From de829ddf23f69190efb4e0178704c4c4228e17cd Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 12 Mar 2021 19:07:27 +1300 Subject: [PATCH] Add condition variable for walreceiver shutdown. Use this new CV to wait for walreceiver shutdown without a sleep/poll loop, while also benefiting from standard postmaster death handling. Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com --- doc/src/sgml/monitoring.sgml | 4 +++ src/backend/postmaster/pgstat.c | 3 ++ src/backend/replication/walreceiver.c | 3 ++ src/backend/replication/walreceiverfuncs.c | 41 +++++++++++++++------- src/include/pgstat.h | 1 + src/include/replication/walreceiver.h | 2 ++ 6 files changed, 41 insertions(+), 13 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 1ba813bbb9..c35045faa1 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1766,6 +1766,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for confirmation from a remote server during synchronous replication. + + WalrcvExit + Waiting for the walreceiver to exit. + XactGroupUpdate Waiting for the group leader to update transaction status at diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 68eefb9722..b1e2d94951 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_WALRCV_EXIT: + event_name = "WalrcvExit"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e5f8a06fea..8532296f26 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -207,6 +207,7 @@ WalReceiverMain(void) case WALRCV_STOPPED: SpinLockRelease(&walrcv->mutex); + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); proc_exit(1); break; @@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg) walrcv->latch = NULL; SpinLockRelease(&walrcv->mutex); + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + /* Terminate the connection gracefully. */ if (wrconn != NULL) walrcv_disconnect(wrconn); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 63e60478ea..fff6c54c45 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -23,6 +23,7 @@ #include #include "access/xlog_internal.h" +#include "pgstat.h" #include "postmaster/startup.h" #include "replication/walreceiver.h" #include "storage/pmsignal.h" @@ -62,6 +63,7 @@ WalRcvShmemInit(void) /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; + ConditionVariableInit(&WalRcv->walRcvStoppedCV); SpinLockInit(&WalRcv->mutex); pg_atomic_init_u64(&WalRcv->writtenUpto, 0); WalRcv->latch = NULL; @@ -95,12 +97,18 @@ WalRcvRunning(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -140,12 +148,18 @@ WalRcvStreaming(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -165,6 +179,7 @@ ShutdownWalRcv(void) { WalRcvData *walrcv = WalRcv; pid_t walrcvpid = 0; + bool stopped = false; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -178,6 +193,7 @@ ShutdownWalRcv(void) break; case WALRCV_STARTING: walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; break; case WALRCV_STREAMING: @@ -191,6 +207,10 @@ ShutdownWalRcv(void) } SpinLockRelease(&walrcv->mutex); + /* Unnecessary but consistent. */ + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + /* * Signal walreceiver process if it was still running. */ @@ -201,16 +221,11 @@ ShutdownWalRcv(void) * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ + ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV); while (WalRcvRunning()) - { - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000); /* 100ms */ - } + ConditionVariableSleep(&walrcv->walRcvStoppedCV, + WAIT_EVENT_WALRCV_EXIT); + ConditionVariableCancelSleep(); } /* diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f9166b8655..be43c04802 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1009,6 +1009,7 @@ typedef enum WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP, + WAIT_EVENT_WALRCV_EXIT, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index a97a59a6a3..4fd7c25ea7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -19,6 +19,7 @@ #include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" +#include "storage/condition_variable.h" #include "storage/latch.h" #include "storage/spin.h" #include "utils/tuplestore.h" @@ -62,6 +63,7 @@ typedef struct */ pid_t pid; WalRcvState walRcvState; + ConditionVariable walRcvStoppedCV; pg_time_t startTime; /*