From a11f330b5584f2430371d68871e00f5c63735299 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 25 Mar 2024 16:34:33 +0530 Subject: [PATCH] Track last_inactive_time in pg_replication_slots. This commit adds a new property called last_inactive_time for slots. It is set to 0 whenever a slot is made active/acquired and set to the current timestamp whenever the slot is inactive/released or restored from the disk. Note that we don't set the last_inactive_time for the slots currently being synced from the primary to the standby because such slots are typically inactive as decoding is not allowed on those. The 'last_inactive_time' will be useful on production servers to debug and analyze inactive replication slots. It will also help to know the lifetime of a replication slot - one can know how long a streaming standby, logical subscriber, or replication slot consumer is down. The 'last_inactive_time' will also be useful to implement inactive timeout-based replication slot invalidation in a future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com --- doc/src/sgml/system-views.sgml | 10 ++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/slot.c | 35 +++++ src/backend/replication/slotfuncs.c | 7 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 6 +- src/include/replication/slot.h | 3 + src/test/recovery/t/019_replslot_limit.pl | 152 ++++++++++++++++++++++ src/test/regress/expected/rules.out | 3 +- 9 files changed, 213 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index b5da476c20..5f4165a945 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2523,6 +2523,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + + last_inactive_time timestamptz + + + The time at which the slot became inactive. + NULL if the slot is currently being used. + + + conflicting bool diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f69b7f5580..bc70ff193e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, + L.last_inactive_time, L.conflicting, L.invalidation_reason, L.failover, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cdf0c450c5..eaa7492569 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_inactive_time = 0; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -622,6 +623,11 @@ retry: if (SlotIsLogical(s)) pgstat_acquire_replslot(s); + /* Reset the last inactive time as the slot is active now. */ + SpinLockAcquire(&s->mutex); + s->last_inactive_time = 0; + SpinLockRelease(&s->mutex); + if (am_walsender) { ereport(log_replication_commands ? LOG : DEBUG1, @@ -645,6 +651,7 @@ ReplicationSlotRelease(void) ReplicationSlot *slot = MyReplicationSlot; char *slotname = NULL; /* keep compiler quiet */ bool is_logical = false; /* keep compiler quiet */ + TimestampTz now = 0; Assert(slot != NULL && slot->active_pid != 0); @@ -679,6 +686,15 @@ ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + /* + * Set the last inactive time after marking the slot inactive. We don't set + * it for the slots currently being synced from the primary to the standby + * because such slots are typically inactive as decoding is not allowed on + * those. + */ + if (!(RecoveryInProgress() && slot->data.synced)) + now = GetCurrentTimestamp(); + if (slot->data.persistency == RS_PERSISTENT) { /* @@ -687,9 +703,16 @@ ReplicationSlotRelease(void) */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + slot->last_inactive_time = now; SpinLockRelease(&slot->mutex); ConditionVariableBroadcast(&slot->active_cv); } + else + { + SpinLockAcquire(&slot->mutex); + slot->last_inactive_time = now; + SpinLockRelease(&slot->mutex); + } MyReplicationSlot = NULL; @@ -2342,6 +2365,18 @@ RestoreSlotFromDisk(const char *name) slot->in_use = true; slot->active_pid = 0; + /* + * We set the last inactive time after loading the slot from the disk + * into memory. Whoever acquires the slot i.e. makes the slot active + * will reset it. We don't set it for the slots currently being synced + * from the primary to the standby because such slots are typically + * inactive as decoding is not allowed on those. + */ + if (!(RecoveryInProgress() && slot->data.synced)) + slot->last_inactive_time = GetCurrentTimestamp(); + else + slot->last_inactive_time = 0; + restored = true; break; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 4232c1e52e..24f5e6d90a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 18 +#define PG_GET_REPLICATION_SLOTS_COLS 19 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.last_inactive_time > 0) + values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time); + else + nulls[i++] = true; + cause = slot_contents.data.invalidated; if (SlotIsPhysical(&slot_contents)) diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index f042d16832..2b1699b157 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202403222 +#define CATALOG_VERSION_NO 202403251 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 71c74350a0..0d26e5b422 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11133,9 +11133,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7f25a083ee..eefd7abd39 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -201,6 +201,9 @@ typedef struct ReplicationSlot * forcibly flushed or not. */ XLogRecPtr last_saved_confirmed_flush; + + /* The time at which this slot becomes inactive */ + TimestampTz last_inactive_time; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index fe00370c3e..3409cf88cd 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -410,4 +410,156 @@ kill 'CONT', $receiverpid; $node_primary3->stop; $node_standby3->stop; +# ============================================================================= +# Testcase start: Check last_inactive_time property of the streaming standby's slot +# + +# Initialize primary node +my $primary4 = PostgreSQL::Test::Cluster->new('primary4'); +$primary4->init(allows_streaming => 'logical'); +$primary4->start; + +# Take backup +$backup_name = 'my_backup4'; +$primary4->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby4 = PostgreSQL::Test::Cluster->new('standby4'); +$standby4->init_from_backup($primary4, $backup_name, has_streaming => 1); + +my $sb4_slot = 'sb4_slot'; +$standby4->append_conf('postgresql.conf', "primary_slot_name = '$sb4_slot'"); + +my $slot_creation_time = $primary4->safe_psql( + 'postgres', qq[ + SELECT current_timestamp; +]); + +$primary4->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := '$sb4_slot'); +]); + +# Get last_inactive_time value after the slot's creation. Note that the slot +# is still inactive till it's used by the standby below. +my $last_inactive_time = + capture_and_validate_slot_last_inactive_time($primary4, $sb4_slot, $slot_creation_time); + +$standby4->start; + +# Wait until standby has replayed enough data +$primary4->wait_for_catchup($standby4); + +# Now the slot is active so last_inactive_time value must be NULL +is( $primary4->safe_psql( + 'postgres', + qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';] + ), + 't', + 'last inactive time for an active physical slot is NULL'); + +# Stop the standby to check its last_inactive_time value is updated +$standby4->stop; + +# Let's restart the primary so that the last_inactive_time is set upon +# loading the slot from the disk. +$primary4->restart; + +is( $primary4->safe_psql( + 'postgres', + qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND last_inactive_time IS NOT NULL;] + ), + 't', + 'last inactive time for an inactive physical slot is updated correctly'); + +$standby4->stop; + +# Testcase end: Check last_inactive_time property of the streaming standby's slot +# ============================================================================= + +# ============================================================================= +# Testcase start: Check last_inactive_time property of the logical subscriber's slot +my $publisher4 = $primary4; + +# Create subscriber node +my $subscriber4 = PostgreSQL::Test::Cluster->new('subscriber4'); +$subscriber4->init; + +# Setup logical replication +my $publisher4_connstr = $publisher4->connstr . ' dbname=postgres'; +$publisher4->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); + +$slot_creation_time = $publisher4->safe_psql( + 'postgres', qq[ + SELECT current_timestamp; +]); + +my $lsub4_slot = 'lsub4_slot'; +$publisher4->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot(slot_name := '$lsub4_slot', plugin := 'pgoutput');" +); + +# Get last_inactive_time value after the slot's creation. Note that the slot +# is still inactive till it's used by the subscriber below. +$last_inactive_time = + capture_and_validate_slot_last_inactive_time($publisher4, $lsub4_slot, $slot_creation_time); + +$subscriber4->start; +$subscriber4->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher4_connstr' PUBLICATION pub WITH (slot_name = '$lsub4_slot', create_slot = false)" +); + +# Wait until subscriber has caught up +$subscriber4->wait_for_subscription_sync($publisher4, 'sub'); + +# Now the slot is active so last_inactive_time value must be NULL +is( $publisher4->safe_psql( + 'postgres', + qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';] + ), + 't', + 'last inactive time for an active logical slot is NULL'); + +# Stop the subscriber to check its last_inactive_time value is updated +$subscriber4->stop; + +# Let's restart the publisher so that the last_inactive_time is set upon +# loading the slot from the disk. +$publisher4->restart; + +is( $publisher4->safe_psql( + 'postgres', + qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND last_inactive_time IS NOT NULL;] + ), + 't', + 'last inactive time for an inactive logical slot is updated correctly'); + +# Testcase end: Check last_inactive_time property of the logical subscriber's slot +# ============================================================================= + +$publisher4->stop; +$subscriber4->stop; + +# Capture and validate last_inactive_time of a given slot. +sub capture_and_validate_slot_last_inactive_time +{ + my ($node, $slot_name, $slot_creation_time) = @_; + + my $last_inactive_time = $node->safe_psql('postgres', + qq(SELECT last_inactive_time FROM pg_replication_slots + WHERE slot_name = '$slot_name' AND last_inactive_time IS NOT NULL;) + ); + + # Check that the captured time is sane + is( $node->safe_psql( + 'postgres', + qq[SELECT '$last_inactive_time'::timestamptz > to_timestamp(0) AND + '$last_inactive_time'::timestamptz >= '$slot_creation_time'::timestamptz;] + ), + 't', + "last inactive time for an active slot $slot_name is sane"); + + return $last_inactive_time; +} + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 18829ea586..dfcbaec387 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,11 +1473,12 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, + l.last_inactive_time, l.conflicting, l.invalidation_reason, l.failover, l.synced - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,