From 3fa17d37716f978f80dfcdab4e7c73f3a24e7a48 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 27 Apr 2021 09:09:11 +0530 Subject: [PATCH] Use HTAB for replication slot statistics. Previously, we used to use the array of size max_replication_slots to store stats for replication slots. But that had two problems in the cases where a message for dropping a slot gets lost: 1) the stats for the new slot are not recorded if the array is full and 2) writing beyond the end of the array if the user reduces the max_replication_slots. This commit uses HTAB for replication slot statistics, resolving both problems. Now, pgstat_vacuum_stat() search for all the dead replication slots in stats hashtable and tell the collector to remove them. To avoid showing the stats for the already-dropped slots, pg_stat_replication_slots view searches slot stats by the slot name taken from pg_replication_slots. Also, we send a message for creating a slot at slot creation, initializing the stats. This reduces the possibility that the stats are accumulated into the old slot stats when a message for dropping a slot gets lost. Reported-by: Andres Freund Author: Sawada Masahiko, test case by Vignesh C Reviewed-by: Amit Kapila, Vignesh C, Dilip Kumar Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de --- contrib/test_decoding/t/001_repl_stats.pl | 69 ++++- src/backend/catalog/system_views.sql | 30 +- src/backend/postmaster/pgstat.c | 340 +++++++++++++--------- src/backend/replication/logical/logical.c | 2 +- src/backend/replication/slot.c | 28 +- src/backend/utils/adt/pgstatfuncs.c | 146 ++++++---- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 14 +- src/include/pgstat.h | 10 +- src/include/replication/slot.h | 2 +- src/test/regress/expected/rules.out | 4 +- src/tools/pgindent/typedefs.list | 2 +- 12 files changed, 389 insertions(+), 260 deletions(-) diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 11b6cd9b9c..3ab0e80722 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -2,9 +2,10 @@ # drop replication slot and restart. use strict; use warnings; +use File::Path qw(rmtree); use PostgresNode; use TestLib; -use Test::More tests => 1; +use Test::More tests => 2; # Test set-up my $node = get_new_node('test'); @@ -12,9 +13,22 @@ $node->init(allows_streaming => 'logical'); $node->append_conf('postgresql.conf', 'synchronous_commit = on'); $node->start; +# Check that replication slot stats are expected. +sub test_slot_stats +{ + my ($node, $expected, $msg) = @_; + + my $result = $node->safe_psql( + 'postgres', qq[ + SELECT slot_name, total_txns > 0 AS total_txn, + total_bytes > 0 AS total_bytes + FROM pg_stat_replication_slots + ORDER BY slot_name]); + is($result, $expected, $msg); +} + # Create table. -$node->safe_psql('postgres', - "CREATE TABLE test_repl_stat(col1 int)"); +$node->safe_psql('postgres', "CREATE TABLE test_repl_stat(col1 int)"); # Create replication slots. $node->safe_psql( @@ -26,7 +40,8 @@ $node->safe_psql( ]); # Insert some data. -$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));"); +$node->safe_psql('postgres', + "INSERT INTO test_repl_stat values(generate_series(1, 5));"); $node->safe_psql( 'postgres', qq[ @@ -50,27 +65,51 @@ $node->poll_query_until( # Test to drop one of the replication slot and verify replication statistics data is # fine after restart. -$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')"); +$node->safe_psql('postgres', + "SELECT pg_drop_replication_slot('regression_slot4')"); $node->stop; $node->start; # Verify statistics data present in pg_stat_replication_slots are sane after # restart. -my $result = $node->safe_psql('postgres', - "SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots - ORDER BY slot_name" -); -is($result, qq(regression_slot1|t|t +test_slot_stats( + $node, + qq(regression_slot1|t|t regression_slot2|t|t -regression_slot3|t|t), 'check replication statistics are updated'); +regression_slot3|t|t), + 'check replication statistics are updated'); + +# Test to remove one of the replication slots and adjust +# max_replication_slots accordingly to the number of slots. This leads +# to a mismatch between the number of slots present in the stats file and the +# number of stats present in the shared memory, simulating the scenario for +# drop slot message lost by the statistics collector process. We verify +# replication statistics data is fine after restart. + +$node->stop; +my $datadir = $node->data_dir; +my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3"; + +rmtree($slot3_replslotdir); + +$node->append_conf('postgresql.conf', 'max_replication_slots = 2'); +$node->start; + +# Verify statistics data present in pg_stat_replication_slots are sane after +# restart. +test_slot_stats( + $node, + qq(regression_slot1|t|t +regression_slot2|t|t), + 'check replication statistics after removing the slot file'); # cleanup $node->safe_psql('postgres', "DROP TABLE test_repl_stat"); -$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')"); -$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')"); -$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')"); +$node->safe_psql('postgres', + "SELECT pg_drop_replication_slot('regression_slot1')"); +$node->safe_psql('postgres', + "SELECT pg_drop_replication_slot('regression_slot2')"); # shutdown $node->stop; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 70e578894f..08f95c43ca 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); -CREATE VIEW pg_stat_replication_slots AS - SELECT - s.slot_name, - s.spill_txns, - s.spill_count, - s.spill_bytes, - s.stream_txns, - s.stream_count, - s.stream_bytes, - s.total_txns, - s.total_bytes, - s.stats_reset - FROM pg_stat_get_replication_slots() AS s; - CREATE VIEW pg_stat_slru AS SELECT s.name, @@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.slot_name, + s.spill_txns, + s.spill_count, + s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, + s.total_txns, + s.total_bytes, + s.stats_reset + FROM pg_replication_slots as r, + LATERAL pg_stat_get_replication_slot(slot_name) as s + WHERE r.datoid IS NOT NULL; -- excluding physical slots + CREATE VIEW pg_stat_database AS SELECT D.oid AS datid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6e8dee9784..ba335fd342 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -106,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_REPLSLOT_HASH_SIZE 32 /* ---------- @@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; -static PgStat_ReplSlotStats *replSlotStats; -static int nReplSlotStats; +static HTAB *replSlotStatHash = NULL; static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* @@ -319,8 +319,8 @@ static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); -static int pgstat_replslot_index(const char *name, bool create_it); -static void pgstat_reset_replslot(int i, TimestampTz ts); +static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); +static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts); static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); @@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void) /* Clean up */ hash_destroy(htab); + /* + * Search for all the dead replication slots in stats hashtable and tell + * the stats collector to drop them. + */ + if (replSlotStatHash) + { + PgStat_StatReplSlotEntry *slotentry; + + hash_seq_init(&hstat, replSlotStatHash); + while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL) + pgstat_report_replslot_drop(NameStr(slotentry->slotname)); + } + } + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name) if (name) { - ReplicationSlot *slot; - - /* - * Check if the slot exists with the given name. It is possible that by - * the time this message is executed the slot is dropped but at least - * this check will ensure that the given name is for a valid slot. - */ - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - slot = SearchNamedReplicationSlot(name); - LWLockRelease(ReplicationSlotControlLock); - - if (!slot) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" does not exist", - name))); - - /* - * Nothing to do for physical slots as we collect stats only for - * logical slots. - */ - if (SlotIsPhysical(slot)) - return; - namestrcpy(&msg.m_slotname, name); msg.clearall = false; } @@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize) * ---------- */ void -pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) +pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat) { PgStat_MsgReplSlot msg; @@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) */ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname)); + msg.m_create = false; msg.m_drop = false; msg.m_spill_txns = repSlotStat->spill_txns; msg.m_spill_count = repSlotStat->spill_count; @@ -1834,6 +1829,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } +/* ---------- + * pgstat_report_replslot_create() - + * + * Tell the collector about creating the replication slot. + * ---------- + */ +void +pgstat_report_replslot_create(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + namestrcpy(&msg.m_slotname, slotname); + msg.m_create = true; + msg.m_drop = false; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + /* ---------- * pgstat_report_replslot_drop() - * @@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); namestrcpy(&msg.m_slotname, slotname); + msg.m_create = false; msg.m_drop = true; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void) * pgstat_fetch_replslot() - * * Support function for the SQL-callable pgstat* functions. Returns - * a pointer to the replication slot statistics struct and sets the - * number of entries in nslots_p. + * a pointer to the replication slot statistics struct. * --------- */ -PgStat_ReplSlotStats * -pgstat_fetch_replslot(int *nslots_p) +PgStat_StatReplSlotEntry * +pgstat_fetch_replslot(NameData slotname) { backend_read_statsfile(); - *nslots_p = nReplSlotStats; - return replSlotStats; + return pgstat_get_replslot_entry(slotname, false); } /* @@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; - int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) /* * Write replication slot stats struct */ - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStatHash) { - fputc('R', fpout); - rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); - (void) rc; /* we'll check for error with ferror */ + PgStat_StatReplSlotEntry *slotent; + + hash_seq_init(&hstat, replSlotStatHash); + while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('R', fpout); + rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } } /* @@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* Allocate the space for replication slot statistics */ - replSlotStats = MemoryContextAllocZero(pgStatLocalContext, - max_replication_slots - * sizeof(PgStat_ReplSlotStats)); - nReplSlotStats = 0; - /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -4005,12 +4016,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) for (i = 0; i < SLRU_NUM_ELEMENTS; i++) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* - * Set the same reset timestamp for all replication slots too. - */ - for (i = 0; i < max_replication_slots; i++) - replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; - /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch @@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication - * slot follows. + * 'R' A PgStat_StatReplSlotEntry struct describing a + * replication slot follows. */ case 'R': - if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) { - ereport(pgStatRunningInCollector ? LOG : WARNING, - (errmsg("corrupted statistics file \"%s\"", - statfile))); - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - goto done; + PgStat_StatReplSlotEntry slotbuf; + PgStat_StatReplSlotEntry *slotent; + + if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin) + != sizeof(PgStat_StatReplSlotEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* Create hash table if we don't have it already. */ + if (replSlotStatHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); + hash_ctl.hcxt = pgStatLocalContext; + replSlotStatHash = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, + (void *) &slotbuf.slotname, + HASH_ENTER, NULL); + memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry)); + break; } - nReplSlotStats++; - break; case 'E': goto done; @@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; - PgStat_ReplSlotStats myReplSlotStats; + PgStat_StatReplSlotEntry myReplSlotStats; PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; @@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; /* - * 'R' A PgStat_ReplSlotStats struct describing a replication - * slot follows. + * 'R' A PgStat_StatReplSlotEntry struct describing a + * replication slot follows. */ case 'R': - if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) - != sizeof(PgStat_ReplSlotStats)) + if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin) + != sizeof(PgStat_StatReplSlotEntry)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", @@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void) /* Reset variables */ pgStatLocalContext = NULL; pgStatDBHash = NULL; - replSlotStats = NULL; - nReplSlotStats = 0; + replSlotStatHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5189,20 +5215,26 @@ static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len) { - int i; - int idx = -1; + PgStat_StatReplSlotEntry *slotent; TimestampTz ts; + /* Return if we don't have replication slot statistics */ + if (replSlotStatHash == NULL) + return; + ts = GetCurrentTimestamp(); if (msg->clearall) { - for (i = 0; i < nReplSlotStats; i++) - pgstat_reset_replslot(i, ts); + HASH_SEQ_STATUS sstat; + + hash_seq_init(&sstat, replSlotStatHash); + while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL) + pgstat_reset_replslot(slotent, ts); } else { - /* Get the index of replication slot statistics to reset */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), false); + /* Get the slot statistics to reset */ + slotent = pgstat_get_replslot_entry(msg->m_slotname, false); /* * Nothing to do if the given slot entry is not found. This could @@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, * corresponding statistics entry is also removed before receiving the * reset message. */ - if (idx < 0) + if (!slotent) return; /* Reset the stats for the requested replication slot */ - pgstat_reset_replslot(idx, ts); + pgstat_reset_replslot(slotent, ts); } } @@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) { - int idx; - - /* - * Get the index of replication slot statistics. On dropping, we don't - * create the new statistics. - */ - idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop); - - /* - * The slot entry is not found or there is no space to accommodate the new - * entry. This could happen when the message for the creation of a slot - * reached before the drop message even though the actual operations - * happen in reverse order. In such a case, the next update of the - * statistics for the same slot will create the required entry. - */ - if (idx < 0) - return; - - /* it must be a valid replication slot index */ - Assert(idx < nReplSlotStats); - if (msg->m_drop) { + Assert(!msg->m_create); + /* Remove the replication slot statistics with the given name */ - if (idx < nReplSlotStats - 1) - memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], - sizeof(PgStat_ReplSlotStats)); - nReplSlotStats--; + if (replSlotStatHash != NULL) + (void) hash_search(replSlotStatHash, + (void *) &(msg->m_slotname), + HASH_REMOVE, + NULL); } else { - /* Update the replication slot statistics */ - replSlotStats[idx].spill_txns += msg->m_spill_txns; - replSlotStats[idx].spill_count += msg->m_spill_count; - replSlotStats[idx].spill_bytes += msg->m_spill_bytes; - replSlotStats[idx].stream_txns += msg->m_stream_txns; - replSlotStats[idx].stream_count += msg->m_stream_count; - replSlotStats[idx].stream_bytes += msg->m_stream_bytes; - replSlotStats[idx].total_txns += msg->m_total_txns; - replSlotStats[idx].total_bytes += msg->m_total_bytes; + PgStat_StatReplSlotEntry *slotent; + + slotent = pgstat_get_replslot_entry(msg->m_slotname, true); + Assert(slotent); + + if (msg->m_create) + { + /* + * If the message for dropping the slot with the same name gets + * lost, slotent has stats for the old slot. So we initialize all + * counters at slot creation. + */ + pgstat_reset_replslot(slotent, 0); + } + else + { + /* Update the replication slot statistics */ + slotent->spill_txns += msg->m_spill_txns; + slotent->spill_count += msg->m_spill_count; + slotent->spill_bytes += msg->m_spill_bytes; + slotent->stream_txns += msg->m_stream_txns; + slotent->stream_count += msg->m_stream_count; + slotent->stream_bytes += msg->m_stream_bytes; + slotent->total_txns += msg->m_total_txns; + slotent->total_bytes += msg->m_total_bytes; + } } } @@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid) } /* ---------- - * pgstat_replslot_index + * pgstat_replslot_entry * - * Return the index of entry of a replication slot with the given name, or - * -1 if the slot is not found. + * Return the entry of replication slot stats with the given name. Return + * NULL if not found and the caller didn't request to create it. * - * create_it tells whether to create the new slot entry if it is not found. + * create tells whether to create the new slot entry if it is not found. * ---------- */ -static int -pgstat_replslot_index(const char *name, bool create_it) +static PgStat_StatReplSlotEntry * +pgstat_get_replslot_entry(NameData name, bool create) { - int i; + PgStat_StatReplSlotEntry *slotent; + bool found; - Assert(nReplSlotStats <= max_replication_slots); - for (i = 0; i < nReplSlotStats; i++) + if (replSlotStatHash == NULL) { - if (namestrcmp(&replSlotStats[i].slotname, name) == 0) - return i; /* found */ + HASHCTL hash_ctl; + + /* + * Quick return NULL if the hash table is empty and the caller didn't + * request to create the entry. + */ + if (!create) + return NULL; + + hash_ctl.keysize = sizeof(NameData); + hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry); + replSlotStatHash = hash_create("Replication slots hash", + PGSTAT_REPLSLOT_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); } - /* - * The slot is not found. We don't want to register the new statistics if - * the list is already full or the caller didn't request. - */ - if (i == max_replication_slots || !create_it) - return -1; + slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash, + (void *) &name, + create ? HASH_ENTER : HASH_FIND, + &found); - /* Register new slot */ - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - namestrcpy(&replSlotStats[nReplSlotStats].slotname, name); + if (!slotent) + { + /* not found */ + Assert(!create && !found); + return NULL; + } - return nReplSlotStats++; + /* initialize the entry */ + if (create && !found) + { + namestrcpy(&(slotent->slotname), NameStr(name)); + pgstat_reset_replslot(slotent, 0); + } + + return slotent; } /* ---------- * pgstat_reset_replslot * - * Reset the replication slot stats at index 'i'. + * Reset the given replication slot stats. * ---------- */ static void -pgstat_reset_replslot(int i, TimestampTz ts) +pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) { /* reset only counters. Don't clear slot name */ - replSlotStats[i].spill_txns = 0; - replSlotStats[i].spill_count = 0; - replSlotStats[i].spill_bytes = 0; - replSlotStats[i].stream_txns = 0; - replSlotStats[i].stream_count = 0; - replSlotStats[i].stream_bytes = 0; - replSlotStats[i].total_txns = 0; - replSlotStats[i].total_bytes = 0; - replSlotStats[i].stat_reset_timestamp = ts; + slotent->spill_txns = 0; + slotent->spill_count = 0; + slotent->spill_bytes = 0; + slotent->stream_txns = 0; + slotent->stream_count = 0; + slotent->stream_bytes = 0; + slotent->total_txns = 0; + slotent->total_bytes = 0; + slotent->stat_reset_timestamp = ts; } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 35b0c67641..00543ede45 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1773,7 +1773,7 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; - PgStat_ReplSlotStats repSlotStat; + PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f61b163f78..cf261e200e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -328,12 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - { - PgStat_ReplSlotStats repSlotStat; - MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats)); - namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); - pgstat_report_replslot(&repSlotStat); - } + pgstat_report_replslot_create(NameStr(slot->data.name)); /* * Now that the slot has been marked as in_use and active, it's safe to @@ -349,17 +344,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, * Search for the named replication slot. * * Return the replication slot if found, otherwise NULL. - * - * The caller must hold ReplicationSlotControlLock in shared mode. */ ReplicationSlot * -SearchNamedReplicationSlot(const char *name) +SearchNamedReplicationSlot(const char *name, bool need_lock) { int i; - ReplicationSlot *slot = NULL; + ReplicationSlot *slot = NULL; - Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, - LW_SHARED)); + if (need_lock) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -372,6 +365,9 @@ SearchNamedReplicationSlot(const char *name) } } + if (need_lock) + LWLockRelease(ReplicationSlotControlLock); + return slot; } @@ -416,7 +412,7 @@ retry: * Search for the slot with the specified name if the slot to acquire is * not given. If the slot is not found, we either return -1 or error out. */ - s = slot ? slot : SearchNamedReplicationSlot(name); + s = slot ? slot : SearchNamedReplicationSlot(name, false); if (s == NULL || !s->in_use) { LWLockRelease(ReplicationSlotControlLock); @@ -713,6 +709,12 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * reduce that possibility. If the messages reached in reverse, we would * lose one statistics update message. But the next update message will * create the statistics for the replication slot. + * + * XXX In case, the messages for creation and drop slot of the same name + * get lost and create happens before (auto)vacuum cleans up the dead + * slot, the stats will be accumulated into the old slot. One can imagine + * having OIDs for each slot to avoid the accumulation of stats but that + * doesn't seem worth doing as in practice this won't happen frequently. */ if (SlotIsLogical(slot)) pgstat_report_replslot_drop(NameStr(slot->data.name)); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 87f02d572e..14056f5347 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -24,6 +24,7 @@ #include "pgstat.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/acl.h" @@ -2207,8 +2208,33 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) char *target = NULL; if (!PG_ARGISNULL(0)) + { + ReplicationSlot *slot; + target = text_to_cstring(PG_GETARG_TEXT_PP(0)); + /* + * Check if the slot exists with the given name. It is possible that + * by the time this message is executed the slot is dropped but at + * least this check will ensure that the given name is for a valid + * slot. + */ + slot = SearchNamedReplicationSlot(target, true); + + if (!slot) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" does not exist", + target))); + + /* + * Nothing to do for physical slots as we collect stats only for + * logical slots. + */ + if (SlotIsPhysical(slot)) + PG_RETURN_VOID(); + } + pgstat_reset_replslot_counter(target); PG_RETURN_VOID(); @@ -2280,73 +2306,77 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } -/* Get the statistics for the replication slots */ +/* + * Get the statistics for the replication slot. If the slot statistics is not + * available, return all-zeroes stats. + */ Datum -pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { #define PG_STAT_GET_REPLICATION_SLOT_COLS 10 - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + text *slotname_text = PG_GETARG_TEXT_P(0); + NameData slotname; TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - PgStat_ReplSlotStats *slotstats; - int nstats; - int i; + Datum values[10]; + bool nulls[10]; + PgStat_StatReplSlotEntry *slotent; + PgStat_StatReplSlotEntry allzero; - /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); - if (!(rsinfo->allowedModes & SFRM_Materialize)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not allowed in this context"))); + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); - - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - - MemoryContextSwitchTo(oldcontext); - - slotstats = pgstat_fetch_replslot(&nstats); - for (i = 0; i < nstats; i++) + namestrcpy(&slotname, text_to_cstring(slotname_text)); + slotent = pgstat_fetch_replslot(slotname); + if (!slotent) { - Datum values[PG_STAT_GET_REPLICATION_SLOT_COLS]; - bool nulls[PG_STAT_GET_REPLICATION_SLOT_COLS]; - PgStat_ReplSlotStats *s = &(slotstats[i]); - - MemSet(values, 0, sizeof(values)); - MemSet(nulls, 0, sizeof(nulls)); - - values[0] = CStringGetTextDatum(NameStr(s->slotname)); - values[1] = Int64GetDatum(s->spill_txns); - values[2] = Int64GetDatum(s->spill_count); - values[3] = Int64GetDatum(s->spill_bytes); - values[4] = Int64GetDatum(s->stream_txns); - values[5] = Int64GetDatum(s->stream_count); - values[6] = Int64GetDatum(s->stream_bytes); - values[7] = Int64GetDatum(s->total_txns); - values[8] = Int64GetDatum(s->total_bytes); - - if (s->stat_reset_timestamp == 0) - nulls[9] = true; - else - values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); - - tuplestore_putvalues(tupstore, tupdesc, values, nulls); + /* + * If the slot is not found, initialise its stats. This is possible if + * the create slot message is lost. + */ + memset(&allzero, 0, sizeof(PgStat_StatReplSlotEntry)); + slotent = &allzero; } - tuplestore_donestoring(tupstore); + values[0] = CStringGetTextDatum(NameStr(slotname)); + values[1] = Int64GetDatum(slotent->spill_txns); + values[2] = Int64GetDatum(slotent->spill_count); + values[3] = Int64GetDatum(slotent->spill_bytes); + values[4] = Int64GetDatum(slotent->stream_txns); + values[5] = Int64GetDatum(slotent->stream_count); + values[6] = Int64GetDatum(slotent->stream_bytes); + values[7] = Int64GetDatum(slotent->total_txns); + values[8] = Int64GetDatum(slotent->total_bytes); - return (Datum) 0; + if (slotent->stat_reset_timestamp == 0) + nulls[9] = true; + else + values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index ba1a0d0333..22dcd0a270 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202104231 +#define CATALOG_VERSION_NO 202104271 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index db1abc149c..91f0ea2212 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5308,14 +5308,14 @@ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}', prosrc => 'pg_stat_get_wal_receiver' }, -{ oid => '8595', descr => 'statistics: information about replication slots', - proname => 'pg_stat_get_replication_slots', prorows => '10', +{ oid => '8595', descr => 'statistics: information about replication slot', + proname => 'pg_stat_get_replication_slot', prorows => '1', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', - prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', - prosrc => 'pg_stat_get_replication_slots' }, + prorettype => 'record', proargtypes => 'text', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', + prosrc => 'pg_stat_get_replication_slot' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5c5920b0b5..1ce363e7d1 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -541,6 +541,7 @@ typedef struct PgStat_MsgReplSlot { PgStat_MsgHdr m_hdr; NameData m_slotname; + bool m_create; bool m_drop; PgStat_Counter m_spill_txns; PgStat_Counter m_spill_count; @@ -917,7 +918,7 @@ typedef struct PgStat_SLRUStats /* * Replication slot statistics kept in the stats collector */ -typedef struct PgStat_ReplSlotStats +typedef struct PgStat_StatReplSlotEntry { NameData slotname; PgStat_Counter spill_txns; @@ -929,7 +930,7 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter total_txns; PgStat_Counter total_bytes; TimestampTz stat_reset_timestamp; -} PgStat_ReplSlotStats; +} PgStat_StatReplSlotEntry; /* @@ -1031,7 +1032,8 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); -extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat); +extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); +extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); @@ -1129,7 +1131,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); -extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); +extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname); extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void); extern void pgstat_count_slru_page_zeroed(int slru_idx); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50d..357068403a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); -extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); +extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6dff5439e0..572bc2057c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name, s.total_txns, s.total_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset); + FROM pg_replication_slots r, + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset) + WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c7aff677d4..878b67a276 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1870,12 +1870,12 @@ PgStat_MsgTabstat PgStat_MsgTempFile PgStat_MsgVacuum PgStat_MsgWal -PgStat_ReplSlotStats PgStat_SLRUStats PgStat_Shared_Reset_Target PgStat_Single_Reset_Type PgStat_StatDBEntry PgStat_StatFuncEntry +PgStat_StatReplSlotEntry PgStat_StatTabEntry PgStat_SubXactStatus PgStat_TableCounts