postgresql/src/backend/utils/activity/pgstat_replslot.c

225 lines
6.0 KiB
C

/* -------------------------------------------------------------------------
*
* pgstat_replslot.c
* Implementation of replication slot statistics.
*
* This file contains the implementation of replication slot statistics. It is kept
* separate from pgstat.c to enforce the line between the statistics access /
* storage implementation and the details about individual types of
* statistics.
*
* Replication slot stats work a bit different than other variable-numbered
* stats. Slots do not have oids (so they can be created on physical
* replicas). Use the slot index as object id while running. However, the slot
* index can change when restarting. That is addressed by using the name when
* (de-)serializing. After a restart it is possible for slots to have been
* dropped while shut down, which is addressed by not restoring stats for
* slots that cannot be found by name when starting up.
*
* Copyright (c) 2001-2023, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/utils/activity/pgstat_replslot.c
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "replication/slot.h"
#include "utils/builtins.h" /* for namestrcpy() */
#include "utils/pgstat_internal.h"
static int get_replslot_index(const char *name);
/*
* Reset counters for a single replication slot.
*
* Permission checking for this function is managed through the normal
* GRANT system.
*/
void
pgstat_reset_replslot(const char *name)
{
ReplicationSlot *slot;
Assert(name != NULL);
/* Check if the slot exits with the given name. */
slot = SearchNamedReplicationSlot(name, true);
if (!slot)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot \"%s\" does not exist",
name)));
/*
* Nothing to do for physical slots as we collect stats only for logical
* slots.
*/
if (SlotIsPhysical(slot))
return;
/* reset this one entry */
pgstat_reset(PGSTAT_KIND_REPLSLOT, InvalidOid,
ReplicationSlotIndex(slot));
}
/*
* Report replication slot statistics.
*
* We can rely on the stats for the slot to exist and to belong to this
* slot. We can only get here if pgstat_create_replslot() or
* pgstat_acquire_replslot() have already been called.
*/
void
pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
{
PgStat_EntryRef *entry_ref;
PgStatShared_ReplSlot *shstatent;
PgStat_StatReplSlotEntry *statent;
entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
ReplicationSlotIndex(slot), false);
shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
statent = &shstatent->stats;
/* Update the replication slot statistics */
#define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld
REPLSLOT_ACC(spill_txns);
REPLSLOT_ACC(spill_count);
REPLSLOT_ACC(spill_bytes);
REPLSLOT_ACC(stream_txns);
REPLSLOT_ACC(stream_count);
REPLSLOT_ACC(stream_bytes);
REPLSLOT_ACC(total_txns);
REPLSLOT_ACC(total_bytes);
#undef REPLSLOT_ACC
pgstat_unlock_entry(entry_ref);
}
/*
* Report replication slot creation.
*
* NB: This gets called with ReplicationSlotAllocationLock already held, be
* careful about calling back into slot.c.
*/
void
pgstat_create_replslot(ReplicationSlot *slot)
{
PgStat_EntryRef *entry_ref;
PgStatShared_ReplSlot *shstatent;
entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
ReplicationSlotIndex(slot), false);
shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
/*
* NB: need to accept that there might be stats from an older slot, e.g.
* if we previously crashed after dropping a slot.
*/
memset(&shstatent->stats, 0, sizeof(shstatent->stats));
pgstat_unlock_entry(entry_ref);
}
/*
* Report replication slot has been acquired.
*
* This guarantees that a stats entry exists during later
* pgstat_report_replslot() calls.
*
* If we previously crashed, no stats data exists. But if we did not crash,
* the stats do belong to this slot:
* - the stats cannot belong to a dropped slot, pgstat_drop_replslot() would
* have been called
* - if the slot was removed while shut down,
* pgstat_replslot_from_serialized_name_cb() returning false would have
* caused the stats to be dropped
*/
void
pgstat_acquire_replslot(ReplicationSlot *slot)
{
pgstat_get_entry_ref(PGSTAT_KIND_REPLSLOT, InvalidOid,
ReplicationSlotIndex(slot), true, NULL);
}
/*
* Report replication slot drop.
*/
void
pgstat_drop_replslot(ReplicationSlot *slot)
{
pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid,
ReplicationSlotIndex(slot));
}
/*
* Support function for the SQL-callable pgstat* functions. Returns
* a pointer to the replication slot statistics struct.
*/
PgStat_StatReplSlotEntry *
pgstat_fetch_replslot(NameData slotname)
{
int idx = get_replslot_index(NameStr(slotname));
if (idx == -1)
return NULL;
return (PgStat_StatReplSlotEntry *)
pgstat_fetch_entry(PGSTAT_KIND_REPLSLOT, InvalidOid, idx);
}
void
pgstat_replslot_to_serialized_name_cb(const PgStat_HashKey *key, const PgStatShared_Common *header, NameData *name)
{
/*
* This is only called late during shutdown. The set of existing slots
* isn't allowed to change at this point, we can assume that a slot exists
* at the offset.
*/
if (!ReplicationSlotName(key->objoid, name))
elog(ERROR, "could not find name for replication slot index %u",
key->objoid);
}
bool
pgstat_replslot_from_serialized_name_cb(const NameData *name, PgStat_HashKey *key)
{
int idx = get_replslot_index(NameStr(*name));
/* slot might have been deleted */
if (idx == -1)
return false;
key->kind = PGSTAT_KIND_REPLSLOT;
key->dboid = InvalidOid;
key->objoid = idx;
return true;
}
void
pgstat_replslot_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
{
((PgStatShared_ReplSlot *) header)->stats.stat_reset_timestamp = ts;
}
static int
get_replslot_index(const char *name)
{
ReplicationSlot *slot;
Assert(name != NULL);
slot = SearchNamedReplicationSlot(name, true);
if (!slot)
return -1;
return ReplicationSlotIndex(slot);
}