diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index a3c5f86b7e..dcb58115af 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1473,7 +1473,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting to apply WAL at recovery because it is delayed. - IO + IO BufFileRead Waiting for a read from a buffered file. @@ -1593,6 +1593,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser LogicalRewriteWrite Waiting for a write of logical rewrite mappings. + + ProcSignalBarrier + Waiting for a barrier event to be processed by all backends. + RelationMapRead Waiting for a read of the relation map file. diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 7fe9e1d440..e919317bab 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -820,6 +820,10 @@ HandleAutoVacLauncherInterrupts(void) rebuild_database_list(InvalidOid); } + /* Process barrier events */ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + /* Process sinval catchup interrupts that happened while sleeping */ ProcessCatchupInterrupt(); } diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 014b9e5289..9b2277d4ae 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -524,6 +524,9 @@ CheckpointerMain(void) static void HandleCheckpointerInterrupts(void) { + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + if (ConfigReloadPending) { ConfigReloadPending = false; @@ -710,6 +713,10 @@ CheckpointWriteDelay(int flags, double progress) AbsorbSyncRequests(); absorb_counter = WRITES_PER_ABSORB; } + + /* Check for barrier events. */ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); } /* diff --git a/src/backend/postmaster/interrupt.c b/src/backend/postmaster/interrupt.c index 6900cd02f6..214ccaf34b 100644 --- a/src/backend/postmaster/interrupt.c +++ b/src/backend/postmaster/interrupt.c @@ -20,6 +20,7 @@ #include "postmaster/interrupt.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/procsignal.h" #include "utils/guc.h" volatile sig_atomic_t ConfigReloadPending = false; @@ -31,6 +32,9 @@ volatile sig_atomic_t ShutdownRequestPending = false; void HandleMainLoopInterrupts(void) { + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + if (ConfigReloadPending) { ConfigReloadPending = false; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e931512203..7410b2ff5e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3988,6 +3988,9 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_LOGICAL_REWRITE_WRITE: event_name = "LogicalRewriteWrite"; break; + case WAIT_EVENT_PROC_SIGNAL_BARRIER: + event_name = "ProcSignalBarrier"; + break; case WAIT_EVENT_RELATION_MAP_READ: event_name = "RelationMapRead"; break; diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 4f59c71f73..11f7052e78 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -96,7 +96,7 @@ StartupProcShutdownHandler(SIGNAL_ARGS) errno = save_errno; } -/* Handle SIGHUP and SIGTERM signals of startup process */ +/* Handle various signals that might be sent to the startup process */ void HandleStartupProcInterrupts(void) { @@ -121,6 +121,10 @@ HandleStartupProcInterrupts(void) */ if (IsUnderPostmaster && !PostmasterIsAlive()) exit(1); + + /* Process barrier events */ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index c36bcc08ec..a4de8a9cd8 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -148,7 +148,8 @@ ProcessWalRcvInterrupts(void) /* * Although walreceiver interrupt handling doesn't use the same scheme as * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive - * any incoming signals on Win32. + * any incoming signals on Win32, and also to make sure we process any + * barrier events. */ CHECK_FOR_INTERRUPTS(); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7ad10736d5..1f10a97dc7 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1852,6 +1852,10 @@ BufferSync(int flags) } UnlockBufHdr(bufHdr, buf_state); + + /* Check for barrier events in case NBuffers is large. */ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); } if (num_to_scan == 0) @@ -1930,6 +1934,10 @@ BufferSync(int flags) } s->num_to_scan++; + + /* Check for barrier events. */ + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); } Assert(num_spaces > 0); @@ -2018,6 +2026,8 @@ BufferSync(int flags) /* * Sleep to throttle our I/O rate. + * + * (This will check for barrier events even if it doesn't sleep.) */ CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); } diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index fde97a1036..06e00eae15 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -20,6 +20,7 @@ #include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -45,13 +46,36 @@ * The flags are actually declared as "volatile sig_atomic_t" for maximum * portability. This should ensure that loads and stores of the flag * values are atomic, allowing us to dispense with any explicit locking. + * + * pss_signalFlags are intended to be set in cases where we don't need to + * keep track of whether or not the target process has handled the signal, + * but sometimes we need confirmation, as when making a global state change + * that cannot be considered complete until all backends have taken notice + * of it. For such use cases, we set a bit in pss_barrierCheckMask and then + * increment the current "barrier generation"; when the new barrier generation + * (or greater) appears in the pss_barrierGeneration flag of every process, + * we know that the message has been received everywhere. */ typedef struct { pid_t pss_pid; sig_atomic_t pss_signalFlags[NUM_PROCSIGNALS]; + pg_atomic_uint64 pss_barrierGeneration; + pg_atomic_uint32 pss_barrierCheckMask; } ProcSignalSlot; +/* + * Information that is global to the entire ProcSignal system can be stored + * here. + * + * psh_barrierGeneration is the highest barrier generation in existence. + */ +typedef struct +{ + pg_atomic_uint64 psh_barrierGeneration; + ProcSignalSlot psh_slot[FLEXIBLE_ARRAY_MEMBER]; +} ProcSignalHeader; + /* * We reserve a slot for each possible BackendId, plus one for each * possible auxiliary process type. (This scheme assumes there is not @@ -59,11 +83,16 @@ typedef struct */ #define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES) -static ProcSignalSlot *ProcSignalSlots = NULL; +/* Check whether the relevant type bit is set in the flags. */ +#define BARRIER_SHOULD_CHECK(flags, type) \ + (((flags) & (((uint32) 1) << (uint32) (type))) != 0) + +static ProcSignalHeader *ProcSignal = NULL; static volatile ProcSignalSlot *MyProcSignalSlot = NULL; static bool CheckProcSignal(ProcSignalReason reason); static void CleanupProcSignalState(int status, Datum arg); +static void ProcessBarrierPlaceholder(void); /* * ProcSignalShmemSize @@ -72,7 +101,11 @@ static void CleanupProcSignalState(int status, Datum arg); Size ProcSignalShmemSize(void) { - return NumProcSignalSlots * sizeof(ProcSignalSlot); + Size size; + + size = mul_size(NumProcSignalSlots, sizeof(ProcSignalSlot)); + size = add_size(size, offsetof(ProcSignalHeader, psh_slot)); + return size; } /* @@ -85,12 +118,26 @@ ProcSignalShmemInit(void) Size size = ProcSignalShmemSize(); bool found; - ProcSignalSlots = (ProcSignalSlot *) - ShmemInitStruct("ProcSignalSlots", size, &found); + ProcSignal = (ProcSignalHeader *) + ShmemInitStruct("ProcSignal", size, &found); - /* If we're first, set everything to zeroes */ + /* If we're first, initialize. */ if (!found) - MemSet(ProcSignalSlots, 0, size); + { + int i; + + pg_atomic_init_u64(&ProcSignal->psh_barrierGeneration, 0); + + for (i = 0; i < NumProcSignalSlots; ++i) + { + ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + + slot->pss_pid = 0; + MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags)); + pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0); + } + } } /* @@ -104,10 +151,11 @@ void ProcSignalInit(int pss_idx) { volatile ProcSignalSlot *slot; + uint64 barrier_generation; Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots); - slot = &ProcSignalSlots[pss_idx - 1]; + slot = &ProcSignal->psh_slot[pss_idx - 1]; /* sanity check */ if (slot->pss_pid != 0) @@ -117,6 +165,23 @@ ProcSignalInit(int pss_idx) /* Clear out any leftover signal reasons */ MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t)); + /* + * Initialize barrier state. Since we're a brand-new process, there + * shouldn't be any leftover backend-private state that needs to be + * updated. Therefore, we can broadcast the latest barrier generation + * and disregard any previously-set check bits. + * + * NB: This only works if this initialization happens early enough in the + * startup sequence that we haven't yet cached any state that might need + * to be invalidated. That's also why we have a memory barrier here, to + * be sure that any later reads of memory happen strictly after this. + */ + pg_atomic_write_u32(&slot->pss_barrierCheckMask, 0); + barrier_generation = + pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation); + pg_memory_barrier(); + /* Mark slot with my PID */ slot->pss_pid = MyProcPid; @@ -129,7 +194,7 @@ ProcSignalInit(int pss_idx) /* * CleanupProcSignalState - * Remove current process from ProcSignalSlots + * Remove current process from ProcSignal mechanism * * This function is called via on_shmem_exit() during backend shutdown. */ @@ -139,7 +204,7 @@ CleanupProcSignalState(int status, Datum arg) int pss_idx = DatumGetInt32(arg); volatile ProcSignalSlot *slot; - slot = &ProcSignalSlots[pss_idx - 1]; + slot = &ProcSignal->psh_slot[pss_idx - 1]; Assert(slot == MyProcSignalSlot); /* @@ -161,6 +226,12 @@ CleanupProcSignalState(int status, Datum arg) return; /* XXX better to zero the slot anyway? */ } + /* + * Make this slot look like it's absorbed all possible barriers, so that + * no barrier waits block on it. + */ + pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + slot->pss_pid = 0; } @@ -182,7 +253,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) if (backendId != InvalidBackendId) { - slot = &ProcSignalSlots[backendId - 1]; + slot = &ProcSignal->psh_slot[backendId - 1]; /* * Note: Since there's no locking, it's possible that the target @@ -212,7 +283,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) for (i = NumProcSignalSlots - 1; i >= 0; i--) { - slot = &ProcSignalSlots[i]; + slot = &ProcSignal->psh_slot[i]; if (slot->pss_pid == pid) { @@ -230,6 +301,187 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) return -1; } +/* + * EmitProcSignalBarrier + * Send a signal to every Postgres process + * + * The return value of this function is the barrier "generation" created + * by this operation. This value can be passed to WaitForProcSignalBarrier + * to wait until it is known that every participant in the ProcSignal + * mechanism has absorbed the signal (or started afterwards). + * + * Note that it would be a bad idea to use this for anything that happens + * frequently, as interrupting every backend could cause a noticeable + * performance hit. + * + * Callers are entitled to assume that this function will not throw ERROR + * or FATAL. + */ +uint64 +EmitProcSignalBarrier(ProcSignalBarrierType type) +{ + uint64 flagbit = UINT64CONST(1) << (uint64) type; + uint64 generation; + + /* + * Set all the flags. + * + * Note that pg_atomic_fetch_or_u32 has full barrier semantics, so this + * is totally ordered with respect to anything the caller did before, and + * anything that we do afterwards. (This is also true of the later call + * to pg_atomic_add_fetch_u64.) + */ + for (int i = 0; i < NumProcSignalSlots; i++) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + + pg_atomic_fetch_or_u32(&slot->pss_barrierCheckMask, flagbit); + } + + /* + * Increment the generation counter. + */ + generation = + pg_atomic_add_fetch_u64(&ProcSignal->psh_barrierGeneration, 1); + + /* + * Signal all the processes, so that they update their advertised barrier + * generation. + * + * Concurrency is not a problem here. Backends that have exited don't + * matter, and new backends that have joined since we entered this function + * must already have current state, since the caller is responsible for + * making sure that the relevant state is entirely visible before calling + * this function in the first place. We still have to wake them up - + * because we can't distinguish between such backends and older backends + * that need to update state - but they won't actually need to change + * any state. + */ + for (int i = NumProcSignalSlots - 1; i >= 0; i--) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + pid_t pid = slot->pss_pid; + + if (pid != 0) + kill(pid, SIGUSR1); + } + + return generation; +} + +/* + * WaitForProcSignalBarrier - wait until it is guaranteed that all changes + * requested by a specific call to EmitProcSignalBarrier() have taken effect. + * + * We expect that the barrier will normally be absorbed very quickly by other + * backends, so we start by waiting just 1/8 of a second and then back off + * by a factor of two every time we time out, to a maximum wait time of + * 1 second. + */ +void +WaitForProcSignalBarrier(uint64 generation) +{ + long timeout = 125L; + + for (int i = NumProcSignalSlots - 1; i >= 0; i--) + { + volatile ProcSignalSlot *slot = &ProcSignal->psh_slot[i]; + uint64 oldval; + + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + while (oldval < generation) + { + int events; + + CHECK_FOR_INTERRUPTS(); + + events = + WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + timeout, WAIT_EVENT_PROC_SIGNAL_BARRIER); + ResetLatch(MyLatch); + + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + if (events & WL_TIMEOUT) + timeout = Min(timeout * 2, 1000L); + } + } + + /* + * The caller is probably calling this function because it wants to + * read the shared state or perform further writes to shared state once + * all backends are known to have absorbed the barrier. However, the + * read of pss_barrierGeneration was performed unlocked; insert a memory + * barrier to separate it from whatever follows. + */ + pg_memory_barrier(); +} + +/* + * Perform global barrier related interrupt checking. + * + * Any backend that participates in ProcSignal signalling must arrange to + * call this function periodically. It is called from CHECK_FOR_INTERRUPTS(), + * which is enough for normal backends, but not necessarily for all types of + * background processes. + */ +void +ProcessProcSignalBarrier(void) +{ + uint64 generation; + uint32 flags; + + /* Exit quickly if there's no work to do. */ + if (!ProcSignalBarrierPending) + return; + ProcSignalBarrierPending = false; + + /* + * Read the current barrier generation, and then get the flags that + * are set for this backend. Note that pg_atomic_exchange_u32 is a full + * barrier, so we're guaranteed that the read of the barrier generation + * happens before we atomically extract the flags, and that any subsequent + * state changes happen afterward. + */ + generation = pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + flags = pg_atomic_exchange_u32(&MyProcSignalSlot->pss_barrierCheckMask, 0); + + /* + * Process each type of barrier. It's important that nothing we call from + * here throws an error, because pss_barrierCheckMask has already been + * cleared. If we jumped out of here before processing all barrier types, + * then we'd forget about the need to do so later. + * + * NB: It ought to be OK to call the barrier-processing functions + * unconditionally, but it's more efficient to call only the ones that + * might need us to do something based on the flags. + */ + if (BARRIER_SHOULD_CHECK(flags, PROCSIGNAL_BARRIER_PLACEHOLDER)) + ProcessBarrierPlaceholder(); + + /* + * State changes related to all types of barriers that might have been + * emitted have now been handled, so we can update our notion of the + * generation to the one we observed before beginning the updates. If + * things have changed further, it'll get fixed up when this function is + * next called. + */ + pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, generation); +} + +static void +ProcessBarrierPlaceholder(void) +{ + /* + * XXX. This is just a placeholder until the first real user of this + * machinery gets committed. Rename PROCSIGNAL_BARRIER_PLACEHOLDER to + * PROCSIGNAL_BARRIER_SOMETHING_ELSE where SOMETHING_ELSE is something + * appropriately descriptive. Get rid of this function and instead have + * ProcessBarrierSomethingElse. Most likely, that function should live + * in the file pertaining to that subsystem, rather than here. + */ +} + /* * CheckProcSignal - check to see if a particular reason has been * signaled, and clear the signal flag. Should be called after receiving @@ -253,6 +505,27 @@ CheckProcSignal(ProcSignalReason reason) return false; } +/* + * CheckProcSignalBarrier - check for new barriers we need to absorb + */ +static bool +CheckProcSignalBarrier(void) +{ + volatile ProcSignalSlot *slot = MyProcSignalSlot; + + if (slot != NULL) + { + uint64 mygen; + uint64 curgen; + + mygen = pg_atomic_read_u64(&slot->pss_barrierGeneration); + curgen = pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); + return (mygen != curgen); + } + + return false; +} + /* * procsignal_sigusr1_handler - handle SIGUSR1 signal. */ @@ -291,6 +564,12 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignalBarrier()) + { + InterruptPending = true; + ProcSignalBarrierPending = true; + } + SetLatch(MyLatch); latch_sigusr1_handler(); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ef5a952968..c42d9ce09a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3180,6 +3180,9 @@ ProcessInterrupts(void) } + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + if (ParallelMessagePending) HandleParallelMessages(); } diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 4473e18e53..3a091022e2 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -32,6 +32,7 @@ volatile sig_atomic_t QueryCancelPending = false; volatile sig_atomic_t ProcDiePending = false; volatile sig_atomic_t ClientConnectionLost = false; volatile sig_atomic_t IdleInTransactionSessionTimeoutPending = false; +volatile sig_atomic_t ProcSignalBarrierPending = false; volatile uint32 InterruptHoldoffCount = 0; volatile uint32 QueryCancelHoldoffCount = 0; volatile uint32 CritSectionCount = 0; diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 24f43ad686..ed80f1d668 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -82,6 +82,7 @@ extern PGDLLIMPORT volatile sig_atomic_t InterruptPending; extern PGDLLIMPORT volatile sig_atomic_t QueryCancelPending; extern PGDLLIMPORT volatile sig_atomic_t ProcDiePending; extern PGDLLIMPORT volatile sig_atomic_t IdleInTransactionSessionTimeoutPending; +extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending; extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fe076d823d..f2e873d048 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -908,6 +908,7 @@ typedef enum WAIT_EVENT_LOGICAL_REWRITE_SYNC, WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE, WAIT_EVENT_LOGICAL_REWRITE_WRITE, + WAIT_EVENT_PROC_SIGNAL_BARRIER, WAIT_EVENT_RELATION_MAP_READ, WAIT_EVENT_RELATION_MAP_SYNC, WAIT_EVENT_RELATION_MAP_WRITE, diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 05b186a05c..b7d0d43f0d 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -45,6 +45,16 @@ typedef enum NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; +typedef enum +{ + /* + * XXX. PROCSIGNAL_BARRIER_PLACEHOLDER should be replaced when the first + * real user of the ProcSignalBarrier mechanism is added. It's just here + * for now because we can't have an empty enum. + */ + PROCSIGNAL_BARRIER_PLACEHOLDER = 0 +} ProcSignalBarrierType; + /* * prototypes for functions in procsignal.c */ @@ -55,6 +65,10 @@ extern void ProcSignalInit(int pss_idx); extern int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId); +extern uint64 EmitProcSignalBarrier(ProcSignalBarrierType type); +extern void WaitForProcSignalBarrier(uint64 generation); +extern void ProcessProcSignalBarrier(void); + extern void procsignal_sigusr1_handler(SIGNAL_ARGS); #endif /* PROCSIGNAL_H */