Post-commit review fixes for slot synchronization.

Allow pg_sync_replication_slots() to error out during promotion of standby.
This makes the behavior of the SQL function consistent with the slot sync
worker. We also ensured that pg_sync_replication_slots() cannot be
executed if sync_replication_slots is enabled and the slotsync worker is
already running to perform the synchronization of slots. Previously, it
would have succeeded in cases when the worker is idle and failed when it
is performing sync which could confuse users.

This patch fixes another issue in the slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise, the slotsync worker could miss handling
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent,
all signal handlers' registration is moved to a prior location before we
set the worker's pid.

Ensure that we clean up synced temp slots at the end of
pg_sync_replication_slots() to avoid such slots being left over after
promotion.

Ensure that ShutDownSlotSync() captures SlotSyncCtx->pid under spinlock to
avoid accessing invalid value as it can be reset by concurrent slot sync
exit due to an error.

Author: Shveta Malik
Reviewed-by: Hou Zhijie, Bertrand Drouvot, Amit Kapila, Masahiko Sawada
Discussion: https://postgr.es/m/CAJpy0uBefXUS_TSz=oxmYKHdg-fhxUT0qfjASW3nmqnzVC3p6A@mail.gmail.com
This commit is contained in:
Amit Kapila 2024-04-25 14:01:44 +05:30
parent 0afa288911
commit db08e8c6fa
6 changed files with 206 additions and 98 deletions

View File

@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
</para>
<caution>

View File

@ -79,10 +79,11 @@
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
* case. Note that we don't need to reset this variable as after promotion the
* slot sync worker won't be restarted because the pmState changes to PM_RUN from
* PM_HOT_STANDBY and we don't support demoting primary without restarting the
* server. See MaybeStartSlotSyncWorker.
* case. The SQL function pg_sync_replication_slots() will also error out if
* this flag is set. Note that we don't need to reset this variable as after
* promotion the slot sync worker won't be restarted because the pmState
* changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
* primary without restarting the server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
@ -92,9 +93,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
*
* All the fields except 'syncing' are used only by slotsync worker.
* 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
SpinLockAcquire(&SlotSyncCtx->mutex);
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot synchronize replication slots concurrently"));
}
SlotSyncCtx->syncing = true;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = true;
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
return some_slot_updated;
}
@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
/*
* Connection cleanup function for slotsync worker.
*
* Called on slotsync worker exit.
*/
static void
slotsync_worker_disconnect(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
walrcv_disconnect(wrconn);
}
/*
* Cleanup function for slotsync worker.
*
@ -1198,8 +1189,38 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
static void
slotsync_worker_onexit(int code, Datum arg)
{
/*
* We need to do slots cleanup here just like WalSndErrorCleanup() does.
*
* The startup process during promotion invokes ShutDownSlotSync() which
* waits for slot sync to finish and it does that by checking the
* 'syncing' flag. Thus the slot sync worker must be done with slots'
* release and cleanup to avoid any dangling temporary slots or active
* slots before it marks itself as finished syncing.
*/
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
/* Also cleanup the temporary slots. */
ReplicationSlotCleanup(false);
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->pid = InvalidPid;
/*
* If syncing_slots is true, it indicates that the process errored out
* without resetting the flag. So, we need to clean up shared memory and
* reset the flag here.
*/
if (syncing_slots)
{
SlotSyncCtx->syncing = false;
syncing_slots = false;
}
SpinLockRelease(&SlotSyncCtx->mutex);
}
@ -1242,6 +1263,64 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
/*
* Emit an error if a promotion or a concurrent sync call is in progress.
* Otherwise, advertise that a sync is in progress.
*/
static void
check_and_set_sync_info(pid_t worker_pid)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
/* The worker pid must not be already assigned in SlotSyncCtx */
Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
/*
* Emit an error if startup process signaled the slot sync machinery to
* stop. See comments atop SlotSyncCtxStruct.
*/
if (SlotSyncCtx->stopSignaled)
{
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
}
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot synchronize replication slots concurrently"));
}
SlotSyncCtx->syncing = true;
/*
* Advertise the required PID so that the startup process can kill the
* slot sync worker on promotion.
*/
SlotSyncCtx->pid = worker_pid;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = true;
}
/*
* Reset syncing flag.
*/
static void
reset_syncing_flag()
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
};
/*
* The main loop of our worker process.
*
@ -1278,47 +1357,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
SpinLockAcquire(&SlotSyncCtx->mutex);
Assert(SlotSyncCtx->pid == InvalidPid);
/*
* Startup process signaled the slot sync worker to stop, so if meanwhile
* postmaster ended up starting the worker again, exit.
*/
if (SlotSyncCtx->stopSignaled)
{
SpinLockRelease(&SlotSyncCtx->mutex);
proc_exit(0);
}
/* Advertise our PID so that the startup process can kill us on promotion */
SlotSyncCtx->pid = MyProcPid;
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(LOG, errmsg("slot sync worker started"));
/* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
pqsignal(SIGTERM, die);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
/*
* Establishes SIGALRM handler and initialize timeout module. It is needed
* by InitPostgres to register different timeouts.
*/
InitializeTimeouts();
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
/*
* If an exception is encountered, processing resumes here.
*
@ -1350,6 +1388,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
pqsignal(SIGTERM, die);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
check_and_set_sync_info(MyProcPid);
ereport(LOG, errmsg("slot sync worker started"));
/* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
/*
* Establishes SIGALRM handler and initialize timeout module. It is needed
* by InitPostgres to register different timeouts.
*/
InitializeTimeouts();
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@ -1402,13 +1466,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
errmsg("could not connect to the primary server: %s", err));
/*
* Register the failure callback once we have the connection.
* Register the disconnection callback.
*
* XXX: This can be combined with previous such cleanup registration of
* XXX: This can be combined with previous cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@ -1457,8 +1521,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
/* The slot sync worker mustn't be running by now */
Assert(SlotSyncCtx->pid == InvalidPid);
/* The slot sync worker or SQL function mustn't be running by now */
Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@ -1471,6 +1535,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
/* The slot must not be acquired by any process */
Assert(s->active_pid == 0);
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@ -1486,25 +1553,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
*
* This function sends signal to shutdown slot sync worker, if required. It
* also waits till the slot sync worker has exited or
* pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
pid_t worker_pid;
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
if (SlotSyncCtx->pid == InvalidPid)
/*
* Return if neither the slot sync worker is running nor the function
* pg_sync_replication_slots() is executing.
*/
if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
worker_pid = SlotSyncCtx->pid;
SpinLockRelease(&SlotSyncCtx->mutex);
kill(SlotSyncCtx->pid, SIGINT);
if (worker_pid != InvalidPid)
kill(worker_pid, SIGINT);
/* Wait for it to die */
/* Wait for slot sync to end */
for (;;)
{
int rc;
@ -1522,8 +1603,8 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
/* Is it gone? */
if (SlotSyncCtx->pid == InvalidPid)
/* Ensure that no process is syncing the slots. */
if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@ -1601,26 +1682,37 @@ SlotSyncShmemInit(void)
}
/*
* Error cleanup callback for slot synchronization.
* Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
if (syncing_slots)
{
/*
* If syncing_slots is true, it indicates that the process errored out
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
/*
* We need to do slots cleanup here just like WalSndErrorCleanup() does.
*
* The startup process during promotion invokes ShutDownSlotSync() which
* waits for slot sync to finish and it does that by checking the
* 'syncing' flag. Thus the SQL function must be done with slots' release
* and cleanup to avoid any dangling temporary slots or active slots
* before it marks itself as finished syncing.
*/
syncing_slots = false;
}
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
/* Also cleanup the synced temporary slots. */
ReplicationSlotCleanup(true);
/*
* The set syncing_slots indicates that the process errored out without
* resetting the flag. So, we need to clean up shared memory and reset the
* flag here.
*/
if (syncing_slots)
reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@ -1634,9 +1726,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
check_and_set_sync_info(InvalidPid);
validate_remote_info(wrconn);
synchronize_slots(wrconn);
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup(true);
/* We are done with sync, so reset sync flag */
reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}

View File

@ -237,7 +237,7 @@ ReplicationSlotShmemExit(int code, Datum arg)
ReplicationSlotRelease();
/* Also cleanup all the temporary slots. */
ReplicationSlotCleanup();
ReplicationSlotCleanup(false);
}
/*
@ -736,10 +736,13 @@ ReplicationSlotRelease(void)
}
/*
* Cleanup all temporary slots created in current session.
* Cleanup temporary slots created in current session.
*
* Cleanup only synced temporary slots if 'synced_only' is true, else
* cleanup all temporary slots.
*/
void
ReplicationSlotCleanup(void)
ReplicationSlotCleanup(bool synced_only)
{
int i;
@ -755,7 +758,8 @@ restart:
continue;
SpinLockAcquire(&s->mutex);
if (s->active_pid == MyProcPid)
if ((s->active_pid == MyProcPid &&
(!synced_only || s->data.synced)))
{
Assert(s->data.persistency == RS_TEMPORARY);
SpinLockRelease(&s->mutex);

View File

@ -336,7 +336,7 @@ WalSndErrorCleanup(void)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
ReplicationSlotCleanup();
ReplicationSlotCleanup(false);
replication_active = false;

View File

@ -4410,7 +4410,7 @@ PostgresMain(const char *dbname, const char *username)
ReplicationSlotRelease();
/* We also want to cleanup temporary slots on error. */
ReplicationSlotCleanup();
ReplicationSlotCleanup(false);
jit_reset_after_error();

View File

@ -247,7 +247,7 @@ extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotCleanup(bool synced_only);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);