diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c64ed561f9..195b840d6f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9262,7 +9262,15 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - InvalidateObsoleteReplicationSlots(_logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) + { + /* + * Some slots have been invalidated; recalculate the old-segment + * horizon, starting again from RedoRecPtr. + */ + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(recptr, &_logSegNo); + } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); @@ -9602,7 +9610,15 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - InvalidateObsoleteReplicationSlots(_logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) + { + /* + * Some slots have been invalidated; recalculate the old-segment + * horizon, starting again from RedoRecPtr. + */ + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(endptr, &_logSegNo); + } _logSegNo--; /* @@ -9771,6 +9787,12 @@ GetWALAvailability(XLogRecPtr targetLSN) * requirement of replication slots. For the latter criterion we do consider * the effects of max_slot_wal_keep_size: reserve at most that much space back * from recptr. + * + * Note about replication slots: if this function calculates a value + * that's further ahead than what slots need reserved, then affected + * slots need to be invalidated and this function invoked again. + * XXX it might be a good idea to rewrite this function so that + * invalidation is optionally done here, instead. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8c18b4ed05..d2a145e122 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1142,11 +1142,14 @@ ReplicationSlotReserveWal(void) * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). * + * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, + bool *invalidated) { int last_signaled_pid = 0; bool released_lock = false; @@ -1203,6 +1206,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) s->active_pid = MyProcPid; s->data.invalidated_at = restart_lsn; s->data.restart_lsn = InvalidXLogRecPtr; + + /* Let caller know */ + *invalidated = true; } SpinLockRelease(&s->mutex); @@ -1290,12 +1296,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. * + * Returns true when any slot have got invalidated. + * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ -void +bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) { XLogRecPtr oldestLSN; + bool invalidated = false; XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); @@ -1308,13 +1317,24 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN)) + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) { /* if the lock was released, start from scratch */ goto restart; } } LWLockRelease(ReplicationSlotControlLock); + + /* + * If any slots have been invalidated, recalculate the resource limits. + */ + if (invalidated) + { + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + + return invalidated; } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 2eb7e3a530..c0fca56bee 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -214,7 +214,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index d4b9ff705f..f1a00c67f2 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -11,7 +11,7 @@ use TestLib; use PostgresNode; use File::Path qw(rmtree); -use Test::More tests => $TestLib::windows_os ? 14 : 18; +use Test::More tests => $TestLib::windows_os ? 15 : 19; use Time::HiRes qw(usleep); $ENV{PGDATABASE} = 'postgres'; @@ -176,7 +176,12 @@ ok( !find_in_log( # Advance WAL again, the slot loses the oldest segment. my $logstart = get_log_size($node_primary); advance_wal($node_primary, 7); -$node_primary->safe_psql('postgres', "CHECKPOINT;"); + +# This slot should be broken, wait for that to happen +$node_primary->poll_query_until( + 'postgres', + qq[SELECT wal_status = 'lost' FROM pg_replication_slots + WHERE slot_name = 'rep1']); # WARNING should be issued ok( find_in_log( @@ -185,13 +190,28 @@ ok( find_in_log( $logstart), 'check that the warning is logged'); -# This slot should be broken -$result = $node_primary->safe_psql('postgres', - "SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'" -); +$result = $node_primary->safe_psql( + 'postgres', + qq[ + SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size + FROM pg_replication_slots WHERE slot_name = 'rep1']); is($result, "rep1|f|t|lost|", 'check that the slot became inactive and the state "lost" persists'); +# The invalidated slot shouldn't keep the old-segment horizon back; +# see bug #17103: https://postgr.es/m/17103-004130e8f27782c9@postgresql.org +# Test for this by creating a new slot and comparing its restart LSN +# to the oldest existing file. +my $redoseg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(lsn) FROM pg_create_physical_replication_slot('s2', true)" +); +my $oldestseg = $node_primary->safe_psql('postgres', + "SELECT pg_ls_dir AS f FROM pg_ls_dir('pg_wal') WHERE pg_ls_dir ~ '^[0-9A-F]{24}\$' ORDER BY 1 LIMIT 1" +); +$node_primary->safe_psql('postgres', + qq[SELECT pg_drop_replication_slot('s2')]); +is($oldestseg, $redoseg, "check that segments have been removed"); + # The standby no longer can connect to the primary $logstart = get_log_size($node_standby); $node_standby->start;