From 1632ea43682fcea8836ea245771ae85b9e1bcd38 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 11 Jun 2021 15:48:26 -0400 Subject: [PATCH] Return ReplicationSlotAcquire API to its original form MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per 96540f80f833; the awkward API introduced by c6550776394e is no longer needed. Author: Andres Freund Reviewed-by: Álvaro Herrera Discussion: https://postgr.es/m/20210408020913.zzprrlvqyvlt5cyy@alap3.anarazel.de --- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 53 +++++-------------- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 4 +- src/include/replication/slot.h | 10 +--- 5 files changed, 18 insertions(+), 53 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 01d354829b..1f38c5b33e 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5a0bad97f4..a9a06b9a38 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ -static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, - const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock) /* * Find a previously created slot and mark it as used by this process. * - * The return value is only useful if behavior is SAB_Inquire, in which - * it's zero if we successfully acquired the slot, -1 if the slot no longer - * exists, or the PID of the owning process otherwise. If behavior is - * SAB_Error, then trying to acquire an owned slot is an error. - * If SAB_Block, we sleep until the slot is released by the owning process. + * An error is raised if nowait is true and the slot is currently in use. If + * nowait is false, we sleep until the slot is released by the owning process. */ -int -ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) -{ - return ReplicationSlotAcquireInternal(NULL, name, behavior); -} - -/* - * Mark the specified slot as used by this process. - * - * Only one of slot and name can be specified. - * If slot == NULL, search for the slot with the given name. - * - * See comments about the return value in ReplicationSlotAcquire(). - */ -static int -ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, - SlotAcquireBehavior behavior) +void +ReplicationSlotAcquire(const char *name, bool nowait) { ReplicationSlot *s; int active_pid; - AssertArg((slot == NULL) ^ (name == NULL)); + AssertArg(name != NULL); retry: Assert(MyReplicationSlot == NULL); @@ -412,17 +392,15 @@ retry: * Search for the slot with the specified name if the slot to acquire is * not given. If the slot is not found, we either return -1 or error out. */ - s = slot ? slot : SearchNamedReplicationSlot(name, false); + s = SearchNamedReplicationSlot(name, false); if (s == NULL || !s->in_use) { LWLockRelease(ReplicationSlotControlLock); - if (behavior == SAB_Inquire) - return -1; ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", - name ? name : NameStr(slot->data.name)))); + name))); } /* @@ -436,7 +414,7 @@ retry: * (We may end up not sleeping, but we don't want to do this while * holding the spinlock.) */ - if (behavior == SAB_Block) + if (!nowait) ConditionVariablePrepareToSleep(&s->active_cv); SpinLockAcquire(&s->mutex); @@ -456,13 +434,11 @@ retry: */ if (active_pid != MyProcPid) { - if (behavior == SAB_Error) + if (!nowait) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", NameStr(s->data.name), active_pid))); - else if (behavior == SAB_Inquire) - return active_pid; /* Wait here until we get signaled, and then restart */ ConditionVariableSleep(&s->active_cv, @@ -470,7 +446,7 @@ retry: ConditionVariableCancelSleep(); goto retry; } - else if (behavior == SAB_Block) + else if (!nowait) ConditionVariableCancelSleep(); /* no sleep needed after all */ /* Let everybody know we've modified this slot */ @@ -478,9 +454,6 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; - - /* success */ - return 0; } /* @@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) WAIT_EVENT_REPLICATION_SLOT_DROP); /* - * Re-acquire lock and start over; we expect to invalidate the slot - * next time (unless another process acquires the slot in the + * Re-acquire lock and start over; we expect to invalidate the + * slot next time (unless another process acquires the slot in the * meantime). */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index e4e6632f82..31e74d3832 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); /* Acquire the slot so we "own" it */ - (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error); + ReplicationSlotAcquire(NameStr(*slotname), true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 109c723f4e..3224536356 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); + ReplicationSlotAcquire(cmd->slotname, true); if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 357068403a..2eb7e3a530 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency RS_TEMPORARY } ReplicationSlotPersistency; -/* For ReplicationSlotAcquire, q.v. */ -typedef enum SlotAcquireBehavior -{ - SAB_Error, - SAB_Block, - SAB_Inquire -} SlotAcquireBehavior; - /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void);