diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index bb1a418450..57b228076e 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx false for physical slots. + + + + conflicting bool + + + True if this logical slot conflicted with recovery (and so is now + invalidated). Always NULL for physical slots. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 18e16ae5b3..2390344529 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + _logSegNo, InvalidOid, + InvalidTransactionId)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + _logSegNo, InvalidOid, + InvalidTransactionId)) { /* * Some slots have been invalidated; recalculate the old-segment diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c123f10989..ff69983f2e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS L.confirmed_flush_lsn, L.wal_status, L.safe_wal_size, - L.two_phase + L.two_phase, + L.conflicting FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6082d222d5..6ecea3c49c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn, NameStr(MyReplicationSlot->data.name)), errdetail("This slot has been invalidated because it exceeded the maximum reserved size."))); + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("can no longer get changes from replication slot \"%s\"", + NameStr(MyReplicationSlot->data.name)), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f969f7c083..4d0421c5ed 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void) } /* - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot - * and mark it invalid, if necessary and possible. + * Report that replication slot needs to be invalidated + */ +static void +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, + bool terminating, + int pid, + NameData slotname, + XLogRecPtr restart_lsn, + XLogRecPtr oldestLSN, + TransactionId snapshotConflictHorizon) +{ + StringInfoData err_detail; + bool hint = false; + + initStringInfo(&err_detail); + + switch (cause) + { + case RS_INVAL_WAL_REMOVED: + hint = true; + appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."), + LSN_FORMAT_ARGS(restart_lsn), + (unsigned long long) (oldestLSN - restart_lsn)); + break; + case RS_INVAL_HORIZON: + appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), + snapshotConflictHorizon); + break; + + case RS_INVAL_WAL_LEVEL: + appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server")); + break; + case RS_INVAL_NONE: + pg_unreachable(); + } + + ereport(LOG, + terminating ? + errmsg("terminating process %d to release replication slot \"%s\"", + pid, NameStr(slotname)) : + errmsg("invalidating obsolete replication slot \"%s\"", + NameStr(slotname)), + errdetail_internal("%s", err_detail.data), + hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0); + + pfree(err_detail.data); +} + +/* + * Helper for InvalidateObsoleteReplicationSlots + * + * Acquires the given slot and mark it invalid, if necessary and possible. * * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void) * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, + ReplicationSlot *s, + XLogRecPtr oldestLSN, + Oid dboid, TransactionId snapshotConflictHorizon, bool *invalidated) { int last_signaled_pid = 0; @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, XLogRecPtr restart_lsn; NameData slotname; int active_pid = 0; + ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE; Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, restart_lsn = s->data.restart_lsn; /* - * If the slot is already invalid or is fresh enough, we don't need to - * do anything. + * If the slot is already invalid or is a non conflicting slot, we + * don't need to do anything. */ - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + if (s->data.invalidated == RS_INVAL_NONE) + { + switch (cause) + { + case RS_INVAL_WAL_REMOVED: + if (s->data.restart_lsn != InvalidXLogRecPtr && + s->data.restart_lsn < oldestLSN) + conflict = cause; + break; + case RS_INVAL_HORIZON: + if (!SlotIsLogical(s)) + break; + /* invalid DB oid signals a shared relation */ + if (dboid != InvalidOid && dboid != s->data.database) + break; + if (TransactionIdIsValid(s->effective_xmin) && + TransactionIdPrecedesOrEquals(s->effective_xmin, + snapshotConflictHorizon)) + conflict = cause; + else if (TransactionIdIsValid(s->effective_catalog_xmin) && + TransactionIdPrecedesOrEquals(s->effective_catalog_xmin, + snapshotConflictHorizon)) + conflict = cause; + break; + case RS_INVAL_WAL_LEVEL: + if (SlotIsLogical(s)) + conflict = cause; + break; + case RS_INVAL_NONE: + pg_unreachable(); + } + } + + /* if there's no conflict, we're done */ + if (conflict == RS_INVAL_NONE) { SpinLockRelease(&s->mutex); if (released_lock) @@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated = RS_INVAL_WAL_REMOVED; + s->data.invalidated = conflict; /* * XXX: We should consider not overwriting restart_lsn and instead * just rely on .invalidated. */ - s->data.restart_lsn = InvalidXLogRecPtr; + if (conflict == RS_INVAL_WAL_REMOVED) + s->data.restart_lsn = InvalidXLogRecPtr; /* Let caller know */ *invalidated = true; @@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, */ if (last_signaled_pid != active_pid) { - ereport(LOG, - errmsg("terminating process %d to release replication slot \"%s\"", - active_pid, NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + ReportSlotInvalidation(conflict, true, active_pid, + slotname, restart_lsn, + oldestLSN, snapshotConflictHorizon); (void) kill(active_pid, SIGTERM); last_signaled_pid = active_pid; @@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotMarkDirty(); ReplicationSlotSave(); ReplicationSlotRelease(); + pgstat_drop_replslot(s); - ereport(LOG, - errmsg("invalidating obsolete replication slot \"%s\"", - NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + ReportSlotInvalidation(conflict, false, active_pid, + slotname, restart_lsn, + oldestLSN, snapshotConflictHorizon); /* done with this slot for now */ break; @@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, } /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Invalidate slots that require resources about to be removed. * * Returns true when any slot have got invalidated. * + * Whether a slot needs to be invalidated depends on the cause. A slot is + * removed if it: + * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given + * db; dboid may be InvalidOid for shared relations + * - RS_INVAL_WAL_LEVEL: is logical + * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, + XLogSegNo oldestSegno, Oid dboid, + TransactionId snapshotConflictHorizon) { XLogRecPtr oldestLSN; bool invalidated = false; + Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon)); + Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0); + Assert(cause != RS_INVAL_NONE); + + if (max_replication_slots == 0) + return invalidated; + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: @@ -1434,7 +1531,9 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) + if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid, + snapshotConflictHorizon, + &invalidated)) { /* if the lock was released, start from scratch */ goto restart; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ad3e72be5e..6035cf4816 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 14 +#define PG_GET_REPLICATION_SLOTS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.data.database == InvalidOid) + nulls[i++] = true; + else + { + if (slot_contents.data.invalidated != RS_INVAL_NONE) + values[i++] = BoolGetDatum(true); + else + values[i++] = BoolGetDatum(false); + } + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index e32d50f386..dabe23bbeb 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202304072 +#define CATALOG_VERSION_NO 202304073 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6291d76a4c..0e9ce5215b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11077,9 +11077,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 34ce055dd5..a8a89dc784 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_NONE, /* required WAL has been removed */ RS_INVAL_WAL_REMOVED, + /* required rows have been removed */ + RS_INVAL_HORIZON, + /* wal_level insufficient for slot */ + RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; /* @@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, + XLogSegNo oldestSegno, + Oid dboid, + TransactionId snapshotConflictHorizon); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 8337bac5db..3d2405272a 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name, l.confirmed_flush_lsn, l.wal_status, l.safe_wal_size, - l.two_phase - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) + l.two_phase, + l.conflicting + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,