diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 171ba7049c..66566765f0 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -314,6 +314,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_replication_slotspg_stat_replication_slots + One row per replication slot, showing statistics about + replication slot usage. + See + pg_stat_replication_slots for details. + + + pg_stat_wal_receiverpg_stat_wal_receiver Only one row, showing statistics about the WAL receiver from @@ -2552,6 +2561,88 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + <structname>pg_stat_replication_slots</structname> + + + pg_stat_replication_slots + + + + The pg_stat_replication_slots view will contain + one row per logical replication slot, showing statistics about its usage. + + + + <structname>pg_stat_replication_slots</structname> View + + + + + Column Type + + + Description + + + + + + + + name text + + + A unique, cluster-wide identifier for the replication slot + + + + + + spill_txns bigint + + + Number of transactions spilled to disk after the memory used by + logical decoding exceeds logical_decoding_work_mem. The + counter gets incremented both for toplevel transactions and + subtransactions. + + + + + + spill_count bigint + + + Number of times transactions were spilled to disk. Transactions + may get spilled repeatedly, and this counter gets incremented on every + such invocation. + + + + + + spill_bytes bigint + + + Amount of decoded transaction data spilled to disk. + + + + + + stats_reset timestamp with time zone + + + Time at which these statistics were last reset + + + + +
+ +
+ <structname>pg_stat_wal_receiver</structname> @@ -4802,6 +4893,27 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i can be granted EXECUTE to run the function.
+ + + + + pg_stat_reset_replication_slot + + pg_stat_reset_replication_slot ( text ) + void + + + Resets statistics to zero for a single replication slot, or for all + replication slots in the cluster. The argument can be either the name + of the slot to reset the stats or NULL. If the argument is NULL, all + counters shown in the pg_stat_replication_slots + view for all replication slots are reset. + + + This function is restricted to superusers by default, but other users + can be granted EXECUTE to run the function. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 923c2e2be1..c29390760f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -796,6 +796,15 @@ CREATE VIEW pg_stat_replication AS JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes, + s.stats_reset + FROM pg_stat_get_replication_slots() AS s; + CREATE VIEW pg_stat_slru AS SELECT s.name, @@ -1453,6 +1462,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5294c78549..822f0ebc62 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -51,6 +51,7 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/backendid.h" #include "storage/dsm.h" @@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_ReplSlotStats *replSlotStats; +static int nReplSlotStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -324,6 +327,9 @@ static void pgstat_read_current_status(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); +static int pgstat_replslot_index(const char *name, bool create_it); +static void pgstat_reset_replslot(int i, TimestampTz ts); + static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); @@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len); static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len); static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); +static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); +static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); /* ------------------------------------------------------------ @@ -1437,6 +1445,61 @@ pgstat_reset_slru_counter(const char *name) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_reset_replslot_counter() - + * + * Tell the statistics collector to reset a single replication slot + * counter, or all replication slots counters (when name is null). + * + * Permission checking for this function is managed through the normal + * GRANT system. + * ---------- + */ +void +pgstat_reset_replslot_counter(const char *name) +{ + PgStat_MsgResetreplslotcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + if (name) + { + ReplicationSlot *slot; + + /* + * Check if the slot exits with the given name. It is possible that by + * the time this message is executed the slot is dropped but at least + * this check will ensure that the given name is for a valid slot. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(name); + LWLockRelease(ReplicationSlotControlLock); + + if (!slot) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" does not exist", + name))); + + /* + * Nothing to do for physical slots as we collect stats only for + * logical slots. + */ + if (SlotIsPhysical(slot)) + return; + + memcpy(&msg.m_slotname, name, NAMEDATALEN); + msg.clearall = false; + } + else + msg.clearall = true; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER); + + pgstat_send(&msg, sizeof(msg)); +} + /* ---------- * pgstat_report_autovac() - * @@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_report_replslot() - + * + * Tell the collector about replication slot statistics. + * ---------- + */ +void +pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes) +{ + PgStat_MsgReplSlot msg; + + /* + * Prepare and send the message + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = false; + msg.m_spill_txns = spilltxns; + msg.m_spill_count = spillcount; + msg.m_spill_bytes = spillbytes; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + +/* ---------- + * pgstat_report_replslot_drop() - + * + * Tell the collector about dropping the replication slot. + * ---------- + */ +void +pgstat_report_replslot_drop(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = true; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} /* ---------- * pgstat_ping() - @@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void) return slruStats; } +/* + * --------- + * pgstat_fetch_replslot() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the replication slot statistics struct and sets the + * number of entries in nslots_p. + * --------- + */ +PgStat_ReplSlotStats * +pgstat_fetch_replslot(int *nslots_p) +{ + backend_read_statsfile(); + + *nslots_p = nReplSlotStats; + return replSlotStats; +} /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array @@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER: + pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter, + len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_REPLSLOT: + pgstat_recv_replslot(&msg.msg_replslot, len); + break; + default: break; } @@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; + int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -5025,6 +5155,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) (void) rc; /* we'll check for error with ferror */ } + /* + * Write replication slot stats struct + */ + for (i = 0; i < nReplSlotStats; i++) + { + fputc('R', fpout); + rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* Allocate the space for replication slot statistics */ + replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats)); + nReplSlotStats = 0; + /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -5273,6 +5417,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) for (i = 0; i < SLRU_NUM_ELEMENTS; i++) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; + /* + * Set the same reset timestamp for all replication slots too. + */ + for (i = 0; i < max_replication_slots; i++) + replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; + /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch @@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication + * slot follows. + */ + case 'R': + if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + goto done; + } + nReplSlotStats++; + break; + case 'E': goto done; @@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_ReplSlotStats myReplSlotStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication + * slot follows. + */ + case 'R': + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + break; + case 'E': goto done; @@ -6367,6 +6551,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len) } } +/* ---------- + * pgstat_recv_resetreplslotcounter() - + * + * Reset some replication slot statistics of the cluster. + * ---------- + */ +static void +pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, + int len) +{ + int i; + int idx = -1; + TimestampTz ts; + + ts = GetCurrentTimestamp(); + if (msg->clearall) + { + for (i = 0; i < nReplSlotStats; i++) + pgstat_reset_replslot(i, ts); + } + else + { + /* Get the index of replication slot statistics to reset */ + idx = pgstat_replslot_index(msg->m_slotname, false); + + /* + * Nothing to do if the given slot entry is not found. This could + * happen when the slot with the given name is removed and the + * corresponding statistics entry is also removed before receiving the + * reset message. + */ + if (idx < 0) + return; + + /* Reset the stats for the requested replication slot */ + pgstat_reset_replslot(idx, ts); + } +} + + /* ---------- * pgstat_recv_autovac() - * @@ -6626,6 +6850,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) dbentry->last_checksum_failure = msg->m_failure_time; } +/* ---------- + * pgstat_recv_replslot() - + * + * Process a REPLSLOT message. + * ---------- + */ +static void +pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) +{ + int idx; + + /* + * Get the index of replication slot statistics. On dropping, we don't + * create the new statistics. + */ + idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop); + + /* + * The slot entry is not found or there is no space to accommodate the new + * entry. This could happen when the message for the creation of a slot + * reached before the drop message even though the actual operations + * happen in reverse order. In such a case, the next update of the + * statistics for the same slot will create the required entry. + */ + if (idx < 0) + return; + + Assert(idx >= 0 && idx <= max_replication_slots); + if (msg->m_drop) + { + /* Remove the replication slot statistics with the given name */ + memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], + sizeof(PgStat_ReplSlotStats)); + nReplSlotStats--; + Assert(nReplSlotStats >= 0); + } + else + { + /* Update the replication slot statistics */ + replSlotStats[idx].spill_txns += msg->m_spill_txns; + replSlotStats[idx].spill_count += msg->m_spill_count; + replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + } +} + /* ---------- * pgstat_recv_tempfile() - * @@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity) return activity; } +/* ---------- + * pgstat_replslot_index + * + * Return the index of entry of a replication slot with the given name, or + * -1 if the slot is not found. + * + * create_it tells whether to create the new slot entry if it is not found. + * ---------- + */ +static int +pgstat_replslot_index(const char *name, bool create_it) +{ + int i; + + Assert(nReplSlotStats <= max_replication_slots); + for (i = 0; i < nReplSlotStats; i++) + { + if (strcmp(replSlotStats[i].slotname, name) == 0) + return i; /* found */ + } + + /* + * The slot is not found. We don't want to register the new statistics if + * the list is already full or the caller didn't request. + */ + if (i == max_replication_slots || !create_it) + return -1; + + /* Register new slot */ + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN); + + return nReplSlotStats++; +} + +/* ---------- + * pgstat_reset_replslot + * + * Reset the replication slot stats at index 'i'. + * ---------- + */ +static void +pgstat_reset_replslot(int i, TimestampTz ts) +{ + /* reset only counters. Don't clear slot name */ + replSlotStats[i].spill_txns = 0; + replSlotStats[i].spill_count = 0; + replSlotStats[i].spill_bytes = 0; + replSlotStats[i].stat_reset_timestamp = ts; +} + /* * pgstat_slru_index * diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f21f61d5e1..3f84ee99b8 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -650,6 +650,12 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, commit_time, origin_id, origin_lsn); + + /* + * Update the decoding stats at transaction commit/abort. It is not clear + * that sending more or less frequently than this would be better. + */ + UpdateDecodingStats(ctx); } /* @@ -669,6 +675,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + + /* update the decoding stats */ + UpdateDecodingStats(ctx); } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 0f6af952f9..3346df32d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -32,6 +32,7 @@ #include "access/xlog_internal.h" #include "fmgr.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/origin.h" @@ -1460,3 +1461,31 @@ ResetLogicalStreamingState(void) CheckXidAlive = InvalidTransactionId; bsysscan = false; } + +/* + * Report stats for a slot. + */ +void +UpdateDecodingStats(LogicalDecodingContext *ctx) +{ + ReorderBuffer *rb = ctx->reorder; + + /* + * Nothing to do if we haven't spilled anything since the last time the + * stats has been sent. + */ + if (rb->spillBytes <= 0) + return; + + elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", + rb, + (long long) rb->spillTxns, + (long long) rb->spillCount, + (long long) rb->spillBytes); + + pgstat_report_replslot(NameStr(ctx->slot->data.name), + rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns = 0; + rb->spillCount = 0; + rb->spillBytes = 0; +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1975d629a6..189641bbf5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -343,6 +343,10 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + buffer->spillTxns = 0; + buffer->spillCount = 0; + buffer->spillBytes = 0; + buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -1579,6 +1583,13 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { ReorderBufferRestoreCleanup(rb, txn); txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + + /* + * We set this flag to indicate if the transaction is ever serialized. + * We need this to accurately update the stats as otherwise the same + * transaction can be counted as serialized multiple times. + */ + txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR; } /* also reset the number of entries in the transaction */ @@ -3112,6 +3123,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; + Size size = txn->size; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -3170,6 +3182,16 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) spilled++; } + /* update the statistics iff we have spilled anything */ + if (spilled) + { + rb->spillCount += 1; + rb->spillBytes += size; + + /* don't consider already serialized transactions */ + rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + } + Assert(spilled == txn->nentries_mem); Assert(dlist_is_empty(&txn->changes)); txn->nentries_mem = 0; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 42c78eabd4..220b4cd6e9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -99,7 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ -static ReplicationSlot *SearchNamedReplicationSlot(const char *name); static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); @@ -314,6 +313,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, LWLockRelease(ReplicationSlotControlLock); + /* + * Create statistics entry for the new logical slot. We don't collect any + * stats for physical slots, so no need to create an entry for the same. + * See ReplicationSlotDropPtr for why we need to do this before releasing + * ReplicationSlotAllocationLock. + */ + if (SlotIsLogical(slot)) + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); + /* * Now that the slot has been marked as in_use and active, it's safe to * let somebody else try to allocate a slot. @@ -331,7 +339,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * * The caller must hold ReplicationSlotControlLock in shared mode. */ -static ReplicationSlot * +ReplicationSlot * SearchNamedReplicationSlot(const char *name) { int i; @@ -683,6 +691,19 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) ereport(WARNING, (errmsg("could not remove directory \"%s\"", tmppath))); + /* + * Send a message to drop the replication slot to the stats collector. + * Since there is no guarantee of the order of message transfer on a UDP + * connection, it's possible that a message for creating a new slot + * reaches before a message for removing the old slot. We send the drop + * and create messages while holding ReplicationSlotAllocationLock to + * reduce that possibility. If the messages reached in reverse, we would + * lose one statistics update message. But the next update message will + * create the statistics for the replication slot. + */ + if (SlotIsLogical(slot)) + pgstat_report_replslot_drop(NameStr(slot->data.name)); + /* * We release this at the very end, so that nobody starts trying to create * a slot while we're still cleaning up the detritus of the old one. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 24e191ea30..0d0d2e6d2b 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2069,6 +2069,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* Reset replication slots stats (a specific one or all of them). */ +Datum +pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) +{ + char *target = NULL; + + if (!PG_ARGISNULL(0)) + target = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + pgstat_reset_replslot_counter(target); + + PG_RETURN_VOID(); +} + Datum pg_stat_get_archiver(PG_FUNCTION_ARGS) { @@ -2134,3 +2148,69 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* Get the statistics for the replication slots */ +Datum +pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + PgStat_ReplSlotStats *slotstats; + int nstats; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + slotstats = pgstat_fetch_replslot(&nstats); + for (i = 0; i < nstats; i++) + { + Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + PgStat_ReplSlotStats *s = &(slotstats[i]); + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = PointerGetDatum(cstring_to_text(s->slotname)); + values[1] = Int64GetDatum(s->spill_txns); + values[2] = Int64GetDatum(s->spill_count); + values[3] = Int64GetDatum(s->spill_bytes); + + if (s->stat_reset_timestamp == 0) + nulls[4] = true; + else + values[4] = TimestampTzGetDatum(s->stat_reset_timestamp); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1be0179724..584c617284 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202010021 +#define CATALOG_VERSION_NO 202010081 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d6f3e2d286..22340baf1c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5257,6 +5257,14 @@ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}', prosrc => 'pg_stat_get_wal_receiver' }, +{ oid => '8595', descr => 'statistics: information about replication slots', + proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{text,int8,int8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o}', + proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}', + prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', @@ -5606,6 +5614,10 @@ descr => 'statistics: reset collected statistics for a single SLRU', proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v', prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' }, +{ oid => '8596', + descr => 'statistics: reset collected statistics for a single replication slot', + proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v', + prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' }, { oid => '3163', descr => 'current trigger depth', proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 343eef507e..a821ff4f15 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -56,6 +56,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_RESETSHAREDCOUNTER, PGSTAT_MTYPE_RESETSINGLECOUNTER, PGSTAT_MTYPE_RESETSLRUCOUNTER, + PGSTAT_MTYPE_RESETREPLSLOTCOUNTER, PGSTAT_MTYPE_AUTOVAC_START, PGSTAT_MTYPE_VACUUM, PGSTAT_MTYPE_ANALYZE, @@ -68,7 +69,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_RECOVERYCONFLICT, PGSTAT_MTYPE_TEMPFILE, PGSTAT_MTYPE_DEADLOCK, - PGSTAT_MTYPE_CHECKSUMFAILURE + PGSTAT_MTYPE_CHECKSUMFAILURE, + PGSTAT_MTYPE_REPLSLOT, } StatMsgType; /* ---------- @@ -358,6 +360,18 @@ typedef struct PgStat_MsgResetslrucounter int m_index; } PgStat_MsgResetslrucounter; +/* ---------- + * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector + * to reset replication slot counter(s) + * ---------- + */ +typedef struct PgStat_MsgResetreplslotcounter +{ + PgStat_MsgHdr m_hdr; + char m_slotname[NAMEDATALEN]; + bool clearall; +} PgStat_MsgResetreplslotcounter; + /* ---------- * PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal * that a database is going to be processed @@ -465,6 +479,22 @@ typedef struct PgStat_MsgSLRU PgStat_Counter m_truncate; } PgStat_MsgSLRU; +/* ---------- + * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication + * slot statistics. + * ---------- + */ +typedef struct PgStat_MsgReplSlot +{ + PgStat_MsgHdr m_hdr; + char m_slotname[NAMEDATALEN]; + bool m_drop; + PgStat_Counter m_spill_txns; + PgStat_Counter m_spill_count; + PgStat_Counter m_spill_bytes; +} PgStat_MsgReplSlot; + + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -603,6 +633,7 @@ typedef union PgStat_Msg PgStat_MsgResetsharedcounter msg_resetsharedcounter; PgStat_MsgResetsinglecounter msg_resetsinglecounter; PgStat_MsgResetslrucounter msg_resetslrucounter; + PgStat_MsgResetreplslotcounter msg_resetreplslotcounter; PgStat_MsgAutovacStart msg_autovacuum_start; PgStat_MsgVacuum msg_vacuum; PgStat_MsgAnalyze msg_analyze; @@ -616,6 +647,7 @@ typedef union PgStat_Msg PgStat_MsgDeadlock msg_deadlock; PgStat_MsgTempFile msg_tempfile; PgStat_MsgChecksumFailure msg_checksumfailure; + PgStat_MsgReplSlot msg_replslot; } PgStat_Msg; @@ -627,7 +659,7 @@ typedef union PgStat_Msg * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9E +#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9F /* ---------- * PgStat_StatDBEntry The collector's data per database @@ -782,6 +814,17 @@ typedef struct PgStat_SLRUStats TimestampTz stat_reset_timestamp; } PgStat_SLRUStats; +/* + * Replication slot statistics kept in the stats collector + */ +typedef struct PgStat_ReplSlotStats +{ + char slotname[NAMEDATALEN]; + PgStat_Counter spill_txns; + PgStat_Counter spill_count; + PgStat_Counter spill_bytes; + TimestampTz stat_reset_timestamp; +} PgStat_ReplSlotStats; /* ---------- * Backend states @@ -1330,6 +1373,7 @@ extern void pgstat_reset_counters(void); extern void pgstat_reset_shared_counters(const char *); extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); +extern void pgstat_reset_replslot_counter(const char *name); extern void pgstat_report_autovac(Oid dboid); extern void pgstat_report_vacuum(Oid tableoid, bool shared, @@ -1342,6 +1386,9 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); +extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes); +extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); extern void pgstat_bestart(void); @@ -1508,6 +1555,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); +extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 45abc444b7..40bab7ee02 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -122,5 +122,6 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); +extern void UpdateDecodingStats(LogicalDecodingContext *ctx); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1ae17d5f11..0cc3aebb11 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -162,9 +162,10 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_STREAMED 0x0008 -#define RBTXN_HAS_TOAST_INSERT 0x0010 -#define RBTXN_HAS_SPEC_INSERT 0x0020 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_TOAST_INSERT 0x0020 +#define RBTXN_HAS_SPEC_INSERT 0x0040 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -184,6 +185,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* Has this transaction ever been spilled to disk? */ +#define rbtxn_is_serialized_clear(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \ +) + /* This transaction's changes has toast insert, without main table insert. */ #define rbtxn_has_toast_insert(txn) \ ( \ @@ -525,6 +532,17 @@ struct ReorderBuffer /* memory accounting */ Size size; + + /* + * Statistics about transactions spilled to disk. + * + * A single transaction may be spilled repeatedly, which is why we keep + * two different counters. For spilling, the transaction counter includes + * both toplevel transactions and subtransactions. + */ + int64 spillTxns; /* number of transactions spilled to disk */ + int64 spillCount; /* spill-to-disk invocation counter */ + int64 spillBytes; /* amount of data spilled to disk */ }; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 31362585ec..63bab6967f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -210,6 +210,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern ReplicationSlot *SearchNamedReplicationSlot(const char *name); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index af4192f9a8..cf2a9b4408 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2018,6 +2018,12 @@ pg_stat_replication| SELECT s.pid, FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); +pg_stat_replication_slots| SELECT s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes, + s.stats_reset + FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4191f94869..be570329ea 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1832,7 +1832,9 @@ PgStat_MsgFuncstat PgStat_MsgHdr PgStat_MsgInquiry PgStat_MsgRecoveryConflict +PgStat_MsgReplSlot PgStat_MsgResetcounter +PgStat_MsgResetreplslotcounter PgStat_MsgResetsharedcounter PgStat_MsgResetsinglecounter PgStat_MsgResetslrucounter @@ -1842,6 +1844,7 @@ PgStat_MsgTabstat PgStat_MsgTempFile PgStat_MsgVacuum PgStat_MsgWal +PgStat_ReplSlotStats PgStat_SLRUStats PgStat_Shared_Reset_Target PgStat_Single_Reset_Type