Fix race condition in invalidating obsolete replication slots

The code added to mark replication slots invalid in commit c655077639
had the race condition that a slot can be dropped or advanced
concurrently with checkpointer trying to invalidate it.  Rewrite the
code to close those races.

The changes to ReplicationSlotAcquire's API added with c655077639 are
not necessary anymore.  To avoid an ABI break in released branches, this
commit leaves that unchanged; it'll be changed in a master-only commit
separately.

Backpatch to 13, where this code first appeared.

Reported-by: Andres Freund <andres@anarazel.de>
Author: Andres Freund <andres@anarazel.de>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20210408001037.wfmk6jud36auhfqm@alap3.anarazel.de
This commit is contained in:
Alvaro Herrera 2021-06-11 12:16:14 -04:00
parent 6e43f1c2df
commit 218b101008
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
1 changed files with 155 additions and 88 deletions

View File

@ -1123,6 +1123,158 @@ ReplicationSlotReserveWal(void)
}
}
/*
* Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
* and mark it invalid, if necessary and possible.
*
* Returns whether ReplicationSlotControlLock was released in the interim (and
* in that case we're not holding the lock at return, otherwise we are).
*
* This is inherently racy, because we release the LWLock
* for syscalls, so caller must restart if we return true.
*/
static bool
InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
{
int last_signaled_pid = 0;
bool released_lock = false;
for (;;)
{
XLogRecPtr restart_lsn;
NameData slotname;
int active_pid = 0;
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
if (!s->in_use)
{
if (released_lock)
LWLockRelease(ReplicationSlotControlLock);
break;
}
/*
* Check if the slot needs to be invalidated. If it needs to be
* invalidated, and is not currently acquired, acquire it and mark it
* as having been invalidated. We do this with the spinlock held to
* avoid race conditions -- for example the restart_lsn could move
* forward, or the slot could be dropped.
*/
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
/*
* If the slot is already invalid or is fresh enough, we don't need to
* do anything.
*/
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
{
SpinLockRelease(&s->mutex);
if (released_lock)
LWLockRelease(ReplicationSlotControlLock);
break;
}
slotname = s->data.name;
active_pid = s->active_pid;
/*
* If the slot can be acquired, do so and mark it invalidated
* immediately. Otherwise we'll signal the owning process, below, and
* retry.
*/
if (active_pid == 0)
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
s->data.invalidated_at = restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
}
SpinLockRelease(&s->mutex);
if (active_pid != 0)
{
/*
* Prepare the sleep on the slot's condition variable before
* releasing the lock, to close a possible race condition if the
* slot is released before the sleep below.
*/
ConditionVariablePrepareToSleep(&s->active_cv);
LWLockRelease(ReplicationSlotControlLock);
released_lock = true;
/*
* Signal to terminate the process that owns the slot, if we
* haven't already signalled it. (Avoidance of repeated
* signalling is the only reason for there to be a loop in this
* routine; otherwise we could rely on caller's restart loop.)
*
* There is the race condition that other process may own the slot
* after its current owner process is terminated and before this
* process owns it. To handle that, we signal only if the PID of
* the owning process has changed from the previous time. (This
* logic assumes that the same PID is not reused very quickly.)
*/
if (last_signaled_pid != active_pid)
{
ereport(LOG,
(errmsg("terminating process %d to release replication slot \"%s\"",
active_pid, NameStr(slotname))));
(void) kill(active_pid, SIGTERM);
last_signaled_pid = active_pid;
}
/* Wait until the slot is released. */
ConditionVariableSleep(&s->active_cv,
WAIT_EVENT_REPLICATION_SLOT_DROP);
/*
* Re-acquire lock and start over; we expect to invalidate the slot
* next time (unless another process acquires the slot in the
* meantime).
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
continue;
}
else
{
/*
* We hold the slot now and have already invalidated it; flush it
* to ensure that state persists.
*
* Don't want to hold ReplicationSlotControlLock across file
* system operations, so release it now but be sure to tell caller
* to restart from scratch.
*/
LWLockRelease(ReplicationSlotControlLock);
released_lock = true;
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
NameStr(slotname),
(uint32) (restart_lsn >> 32),
(uint32) restart_lsn)));
/* done with this slot for now */
break;
}
}
Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
return released_lock;
}
/*
* Mark any slot that points to an LSN older than the given segment
* as invalid; it requires WAL that's about to be removed.
@ -1141,100 +1293,15 @@ restart:
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
NameData slotname;
int wspid;
int last_signaled_pid = 0;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
continue;
LWLockRelease(ReplicationSlotControlLock);
CHECK_FOR_INTERRUPTS();
/* Get ready to sleep on the slot in case it is active */
ConditionVariablePrepareToSleep(&s->active_cv);
for (;;)
if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
{
/*
* Try to mark this slot as used by this process.
*
* Note that ReplicationSlotAcquireInternal(SAB_Inquire)
* should not cancel the prepared condition variable
* if this slot is active in other process. Because in this case
* we have to wait on that CV for the process owning
* the slot to be terminated, later.
*/
wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
/*
* Exit the loop if we successfully acquired the slot or
* the slot was dropped during waiting for the owning process
* to be terminated. For example, the latter case is likely to
* happen when the slot is temporary because it's automatically
* dropped by the termination of the owning process.
*/
if (wspid <= 0)
break;
/*
* Signal to terminate the process that owns the slot.
*
* There is the race condition where other process may own
* the slot after the process using it was terminated and before
* this process owns it. To handle this case, we signal again
* if the PID of the owning process is changed than the last.
*
* XXX This logic assumes that the same PID is not reused
* very quickly.
*/
if (last_signaled_pid != wspid)
{
ereport(LOG,
(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
wspid, NameStr(slotname))));
(void) kill(wspid, SIGTERM);
last_signaled_pid = wspid;
}
ConditionVariableTimedSleep(&s->active_cv, 10,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();
/*
* Do nothing here and start from scratch if the slot has
* already been dropped.
*/
if (wspid == -1)
/* if the lock was released, start from scratch */
goto restart;
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
NameStr(slotname),
(uint32) (restart_lsn >> 32),
(uint32) restart_lsn)));
SpinLockAcquire(&s->mutex);
s->data.invalidated_at = s->data.restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
/* if we did anything, start from scratch */
goto restart;
}
}
LWLockRelease(ReplicationSlotControlLock);
}