diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5640c0d84a..79ca45a156 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1690,6 +1690,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i backup: This WAL sender is sending a backup. + + + stopping: This WAL sender is stopping. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 35ee7d1cb6..70d2570dc2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8324,6 +8324,17 @@ ShutdownXLOG(int code, Datum arg) ereport(IsPostmasterEnvironment ? LOG : NOTICE, (errmsg("shutting down"))); + /* + * Signal walsenders to move to stopping state. + */ + WalSndInitStopping(); + + /* + * Wait for WAL senders to be in stopping state. This prevents commands + * from writing new WAL. + */ + WalSndWaitStopping(); + if (RecoveryInProgress()) CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); else diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 27aa3e6bc7..c9a6f7019d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -24,11 +24,17 @@ * are treated as not a crash but approximately normal termination; * the walsender will exit quickly without sending any more XLOG records. * - * If the server is shut down, postmaster sends us SIGUSR2 after all - * regular backends have exited and the shutdown checkpoint has been written. - * This instructs walsender to send any outstanding WAL, including the - * shutdown checkpoint record, wait for it to be replicated to the standby, - * and then exit. + * If the server is shut down, checkpointer sends us + * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If + * the backend is idle or runs an SQL query this causes the backend to + * shutdown, if logical replication is in progress all existing WAL records + * are processed followed by a shutdown. Otherwise this causes the walsender + * to switch to the "stopping" state. In this state, the walsender will reject + * any further replication commands. The checkpointer begins the shutdown + * checkpoint once all walsenders are confirmed as stopping. When the shutdown + * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs + * walsender to send any outstanding WAL, including the shutdown checkpoint + * record, wait for it to be replicated to the standby, and then exit. * * * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group @@ -177,13 +183,14 @@ static bool WalSndCaughtUp = false; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t walsender_ready_to_stop = false; +static volatile sig_atomic_t got_SIGUSR2 = false; +static volatile sig_atomic_t got_STOPPING = false; /* - * This is set while we are streaming. When not set, SIGUSR2 signal will be - * handled like SIGTERM. When set, the main loop is responsible for checking - * walsender_ready_to_stop and terminating when it's set (after streaming any - * remaining WAL). + * This is set while we are streaming. When not set + * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, + * the main loop is responsible for checking got_STOPPING and terminating when + * it's set (after streaming any remaining WAL). */ static volatile sig_atomic_t replication_active = false; @@ -300,7 +307,8 @@ WalSndErrorCleanup(void) ReplicationSlotCleanup(); replication_active = false; - if (walsender_ready_to_stop) + + if (got_STOPPING || got_SIGUSR2) proc_exit(0); /* Revert back to startup state */ @@ -677,7 +685,7 @@ StartReplication(StartReplicationCmd *cmd) WalSndLoop(XLogSendPhysical); replication_active = false; - if (walsender_ready_to_stop) + if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); @@ -1055,7 +1063,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) { ereport(LOG, (errmsg("terminating walsender process after promotion"))); - walsender_ready_to_stop = true; + got_STOPPING = true; } WalSndSetState(WALSNDSTATE_CATCHUP); @@ -1106,7 +1114,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotRelease(); replication_active = false; - if (walsender_ready_to_stop) + if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); @@ -1311,6 +1319,14 @@ WalSndWaitForWal(XLogRecPtr loc) /* Check for input from the client */ ProcessRepliesIfAny(); + /* + * If we're shutting down, trigger pending WAL to be written out, + * otherwise we'd possibly end up waiting for WAL that never gets + * written, because walwriter has shut down already. + */ + if (got_STOPPING) + XLogBackgroundFlush(); + /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); @@ -1326,7 +1342,7 @@ WalSndWaitForWal(XLogRecPtr loc) * RecentFlushPtr, so we can send all remaining data before shutting * down. */ - if (walsender_ready_to_stop) + if (got_STOPPING) break; /* @@ -1400,6 +1416,22 @@ exec_replication_command(const char *cmd_string) MemoryContext cmd_context; MemoryContext old_context; + /* + * If WAL sender has been told that shutdown is getting close, switch its + * status accordingly to handle the next replication commands correctly. + */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + + /* + * Throw error if in stopping mode. We need prevent commands that could + * generate WAL while the shutdown checkpoint is being written. To be + * safe, we just prohibit all new commands. + */ + if (MyWalSnd->state == WALSNDSTATE_STOPPING) + ereport(ERROR, + (errmsg("cannot execute new commands while WAL sender is in stopping mode"))); + /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next * command arrives. Clean up the old stuff if there's anything. @@ -2128,7 +2160,7 @@ WalSndLoop(WalSndSendDataCallback send_data) * normal termination at shutdown, or a promotion, the walsender * is not sure which. */ - if (walsender_ready_to_stop) + if (got_SIGUSR2) WalSndDone(send_data); } @@ -2443,6 +2475,10 @@ XLogSendPhysical(void) XLogRecPtr endptr; Size nbytes; + /* If requested switch the WAL sender to the stopping state. */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + if (streamingDoneSending) { WalSndCaughtUp = true; @@ -2733,7 +2769,16 @@ XLogSendLogical(void) * point, then we're caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) + { WalSndCaughtUp = true; + + /* + * Have WalSndLoop() terminate the connection in an orderly + * manner, after writing out all the pending data. + */ + if (got_STOPPING) + got_SIGUSR2 = true; + } } /* Update shared memory status */ @@ -2843,6 +2888,26 @@ WalSndRqstFileReload(void) } } +/* + * Handle PROCSIG_WALSND_INIT_STOPPING signal. + */ +void +HandleWalSndInitStopping(void) +{ + Assert(am_walsender); + + /* + * If replication has not yet started, die like with SIGTERM. If + * replication is active, only set a flag and wake up the main loop. It + * will send any outstanding WAL, wait for it to be replicated to the + * standby, and then exit gracefully. + */ + if (!replication_active) + kill(MyProcPid, SIGTERM); + else + got_STOPPING = true; +} + /* SIGHUP: set flag to re-read config file at next convenient time */ static void WalSndSigHupHandler(SIGNAL_ARGS) @@ -2856,22 +2921,17 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGUSR2: set flag to do a last cycle and shut down afterwards */ +/* + * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL + * sender should already have been switched to WALSNDSTATE_STOPPING at + * this point. + */ static void WalSndLastCycleHandler(SIGNAL_ARGS) { int save_errno = errno; - /* - * If replication has not yet started, die like with SIGTERM. If - * replication is active, only set a flag and wake up the main loop. It - * will send any outstanding WAL, wait for it to be replicated to the - * standby, and then exit gracefully. - */ - if (!replication_active) - kill(MyProcPid, SIGTERM); - - walsender_ready_to_stop = true; + got_SIGUSR2 = true; SetLatch(MyLatch); errno = save_errno; @@ -2969,6 +3029,77 @@ WalSndWakeup(void) } } +/* + * Signal all walsenders to move to stopping state. + * + * This will trigger walsenders to move to a state where no further WAL can be + * generated. See this file's header for details. + */ +void +WalSndInitStopping(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + pid_t pid; + + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + SpinLockRelease(&walsnd->mutex); + + if (pid == 0) + continue; + + SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId); + } +} + +/* + * Wait that all the WAL senders have quit or reached the stopping state. This + * is used by the checkpointer to control when the shutdown checkpoint can + * safely be performed. + */ +void +WalSndWaitStopping(void) +{ + for (;;) + { + int i; + bool all_stopped = true; + + for (i = 0; i < max_wal_senders; i++) + { + WalSndState state; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + + if (state != WALSNDSTATE_STOPPING) + { + all_stopped = false; + break; + } + } + + /* safe to leave if confirmation is done for all WAL senders */ + if (all_stopped) + return; + + pg_usleep(10000L); /* wait for 10 msec */ + } +} + /* Set state for current walsender (only called in walsender) */ void WalSndSetState(WalSndState state) @@ -3002,6 +3133,8 @@ WalSndGetStateString(WalSndState state) return "catchup"; case WALSNDSTATE_STREAMING: return "streaming"; + case WALSNDSTATE_STOPPING: + return "stopping"; } return "UNKNOWN"; } diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 4a21d5512d..b9302ac630 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -20,6 +20,7 @@ #include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" +#include "replication/walsender.h" #include "storage/latch.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -270,6 +271,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE)) HandleParallelMessageInterrupt(); + if (CheckProcSignal(PROCSIG_WALSND_INIT_STOPPING)) + HandleWalSndInitStopping(); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 2ca903872e..c50e450ec2 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -44,6 +44,9 @@ extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern void WalSndWakeup(void); +extern void WalSndInitStopping(void); +extern void WalSndWaitStopping(void); +extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); /* diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 2c59056cef..36311e124c 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -24,7 +24,8 @@ typedef enum WalSndState WALSNDSTATE_STARTUP = 0, WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, - WALSNDSTATE_STREAMING + WALSNDSTATE_STREAMING, + WALSNDSTATE_STOPPING } WalSndState; /* diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index d068dde5d7..553f0f43f7 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -32,6 +32,7 @@ typedef enum PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ + PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for shutdown */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE,