From e5bcbb10707b844471c67d5e5fd8226d1891ee97 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 16 Jul 2021 12:07:30 -0400 Subject: [PATCH] Advance old-segment horizon properly after slot invalidation When some slots are invalidated due to the max_slot_wal_keep_size limit, the old segment horizon should move forward to stay within the limit. However, in commit c6550776394e we forgot to call KeepLogSeg again to recompute the horizon after invalidating replication slots. In cases where other slots remained, the limits would be recomputed eventually for other reasons, but if all slots were invalidated, the limits would not move at all afterwards. Repair. Backpatch to 13 where the feature was introduced. Author: Kyotaro Horiguchi Reported-by: Marcin Krupowicz Discussion: https://postgr.es/m/17103-004130e8f27782c9@postgresql.org --- src/backend/access/transam/xlog.c | 26 ++++++++++++++++-- src/backend/replication/slot.c | 26 +++++++++++++++--- src/include/replication/slot.h | 2 +- src/test/recovery/t/019_replslot_limit.pl | 32 ++++++++++++++++++----- 4 files changed, 74 insertions(+), 12 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c64ed561f9..195b840d6f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9262,7 +9262,15 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - InvalidateObsoleteReplicationSlots(_logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) + { + /* + * Some slots have been invalidated; recalculate the old-segment + * horizon, starting again from RedoRecPtr. + */ + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(recptr, &_logSegNo); + } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); @@ -9602,7 +9610,15 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - InvalidateObsoleteReplicationSlots(_logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) + { + /* + * Some slots have been invalidated; recalculate the old-segment + * horizon, starting again from RedoRecPtr. + */ + XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); + KeepLogSeg(endptr, &_logSegNo); + } _logSegNo--; /* @@ -9771,6 +9787,12 @@ GetWALAvailability(XLogRecPtr targetLSN) * requirement of replication slots. For the latter criterion we do consider * the effects of max_slot_wal_keep_size: reserve at most that much space back * from recptr. + * + * Note about replication slots: if this function calculates a value + * that's further ahead than what slots need reserved, then affected + * slots need to be invalidated and this function invoked again. + * XXX it might be a good idea to rewrite this function so that + * invalidation is optionally done here, instead. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8c18b4ed05..d2a145e122 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1142,11 +1142,14 @@ ReplicationSlotReserveWal(void) * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). * + * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, + bool *invalidated) { int last_signaled_pid = 0; bool released_lock = false; @@ -1203,6 +1206,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) s->active_pid = MyProcPid; s->data.invalidated_at = restart_lsn; s->data.restart_lsn = InvalidXLogRecPtr; + + /* Let caller know */ + *invalidated = true; } SpinLockRelease(&s->mutex); @@ -1290,12 +1296,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. * + * Returns true when any slot have got invalidated. + * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ -void +bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) { XLogRecPtr oldestLSN; + bool invalidated = false; XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); @@ -1308,13 +1317,24 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN)) + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) { /* if the lock was released, start from scratch */ goto restart; } } LWLockRelease(ReplicationSlotControlLock); + + /* + * If any slots have been invalidated, recalculate the resource limits. + */ + if (invalidated) + { + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + + return invalidated; } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 2eb7e3a530..c0fca56bee 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -214,7 +214,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index d4b9ff705f..f1a00c67f2 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -11,7 +11,7 @@ use TestLib; use PostgresNode; use File::Path qw(rmtree); -use Test::More tests => $TestLib::windows_os ? 14 : 18; +use Test::More tests => $TestLib::windows_os ? 15 : 19; use Time::HiRes qw(usleep); $ENV{PGDATABASE} = 'postgres'; @@ -176,7 +176,12 @@ ok( !find_in_log( # Advance WAL again, the slot loses the oldest segment. my $logstart = get_log_size($node_primary); advance_wal($node_primary, 7); -$node_primary->safe_psql('postgres', "CHECKPOINT;"); + +# This slot should be broken, wait for that to happen +$node_primary->poll_query_until( + 'postgres', + qq[SELECT wal_status = 'lost' FROM pg_replication_slots + WHERE slot_name = 'rep1']); # WARNING should be issued ok( find_in_log( @@ -185,13 +190,28 @@ ok( find_in_log( $logstart), 'check that the warning is logged'); -# This slot should be broken -$result = $node_primary->safe_psql('postgres', - "SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'" -); +$result = $node_primary->safe_psql( + 'postgres', + qq[ + SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size + FROM pg_replication_slots WHERE slot_name = 'rep1']); is($result, "rep1|f|t|lost|", 'check that the slot became inactive and the state "lost" persists'); +# The invalidated slot shouldn't keep the old-segment horizon back; +# see bug #17103: https://postgr.es/m/17103-004130e8f27782c9@postgresql.org +# Test for this by creating a new slot and comparing its restart LSN +# to the oldest existing file. +my $redoseg = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name(lsn) FROM pg_create_physical_replication_slot('s2', true)" +); +my $oldestseg = $node_primary->safe_psql('postgres', + "SELECT pg_ls_dir AS f FROM pg_ls_dir('pg_wal') WHERE pg_ls_dir ~ '^[0-9A-F]{24}\$' ORDER BY 1 LIMIT 1" +); +$node_primary->safe_psql('postgres', + qq[SELECT pg_drop_replication_slot('s2')]); +is($oldestseg, $redoseg, "check that segments have been removed"); + # The standby no longer can connect to the primary $logstart = get_log_size($node_standby); $node_standby->start;