diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 64614b569c..0d61d98b11 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -9907,6 +9907,44 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + wal_status + text + + + Availability of WAL files claimed by this slot. + Possible values are: + + + normal means that the claimed files + are within max_wal_size + + + reserved means that max_wal_size + is exceeded but the files are still held, either by some replication + slot or by wal_keep_segments + + + lost means that some WAL files are definitely lost + and this slot cannot be used to resume replication anymore. + + + The last two states are seen only when + is + non-negative. If restart_lsn is NULL, this + field is null. + + + + + min_safe_lsn + pg_lsn + + + The minimum LSN currently available for walsenders. + + + diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f68c992213..095b3668b8 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3777,6 +3777,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + max_slot_wal_keep_size (integer) + + max_slot_wal_keep_size configuration parameter + + + + + Specify the maximum size of WAL files + that replication + slots are allowed to retain in the pg_wal + directory at checkpoint time. + If max_slot_wal_keep_size is -1 (the default), + replication slots retain unlimited amount of WAL files. If + restart_lsn of a replication slot gets behind more than that megabytes + from the current LSN, the standby using the slot may no longer be able + to continue replication due to removal of required WAL files. You + can see the WAL availability of replication slots + in pg_replication_slots. + + + + wal_sender_timeout (integer) diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index bb5d9962ed..4659b9ef5d 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' . However, these methods often result in retaining more WAL segments than required, whereas replication slots retain only the number of segments - known to be needed. An advantage of these methods is that they bound - the space requirement for pg_wal; there is currently no way - to do this using replication slots. + known to be needed. On the other hand, replication slots can retain so + many WAL segments that they fill up the space allocated + for pg_wal; + limits the size of WAL files + retained by replication slots. Similarly, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1651e15e89..8a4c1743e5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -108,6 +108,7 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_slot_wal_keep_size_mb = -1; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -759,7 +760,7 @@ static ControlFileData *ControlFile = NULL; */ #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD) -/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */ +/* Convert values of GUCs measured in megabytes to equiv. segment count */ #define ConvertToXSegs(x, segsize) \ (x / ((segsize) / (1024 * 1024))) @@ -3963,9 +3964,10 @@ XLogGetLastRemovedSegno(void) return lastRemovedSegNo; } + /* - * Update the last removed segno pointer in shared memory, to reflect - * that the given XLOG file has been removed. + * Update the last removed segno pointer in shared memory, to reflect that the + * given XLOG file has been removed. */ static void UpdateLastRemovedPtr(char *filename) @@ -9043,6 +9045,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + InvalidateObsoleteReplicationSlots(_logSegNo); _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); @@ -9377,6 +9380,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + InvalidateObsoleteReplicationSlots(_logSegNo); _logSegNo--; /* @@ -9445,48 +9449,143 @@ CreateRestartPoint(int flags) return true; } +/* + * Report availability of WAL for the given target LSN + * (typically a slot's restart_lsn) + * + * Returns one of the following enum values: + * * WALAVAIL_NORMAL means targetLSN is available because it is in the range + * of max_wal_size. + * + * * WALAVAIL_PRESERVED means it is still available by preserving extra + * segments beyond max_wal_size. If max_slot_wal_keep_size is smaller + * than max_wal_size, this state is not returned. + * + * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on + * a slot with this LSN cannot continue. + * + * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL. + */ +WALAvailability +GetWALAvailability(XLogRecPtr targetLSN) +{ + XLogRecPtr currpos; /* current write LSN */ + XLogSegNo currSeg; /* segid of currpos */ + XLogSegNo targetSeg; /* segid of targetLSN */ + XLogSegNo oldestSeg; /* actual oldest segid */ + XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */ + XLogSegNo oldestSlotSeg = InvalidXLogRecPtr; /* oldest segid kept by + * slot */ + uint64 keepSegs; + + /* slot does not reserve WAL. Either deactivated, or has never been active */ + if (XLogRecPtrIsInvalid(targetLSN)) + return WALAVAIL_INVALID_LSN; + + currpos = GetXLogWriteRecPtr(); + + /* calculate oldest segment currently needed by slots */ + XLByteToSeg(targetLSN, targetSeg, wal_segment_size); + KeepLogSeg(currpos, &oldestSlotSeg); + + /* + * Find the oldest extant segment file. We get 1 until checkpoint removes + * the first WAL segment file since startup, which causes the status being + * wrong under certain abnormal conditions but that doesn't actually harm. + */ + oldestSeg = XLogGetLastRemovedSegno() + 1; + + /* calculate oldest segment by max_wal_size and wal_keep_segments */ + XLByteToSeg(currpos, currSeg, wal_segment_size); + keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments), + wal_segment_size) + 1; + + if (currSeg > keepSegs) + oldestSegMaxWalSize = currSeg - keepSegs; + else + oldestSegMaxWalSize = 1; + + /* + * If max_slot_wal_keep_size has changed after the last call, the segment + * that would been kept by the current setting might have been lost by the + * previous setting. No point in showing normal or keeping status values + * if the targetSeg is known to be lost. + */ + if (targetSeg >= oldestSeg) + { + /* + * show "normal" when targetSeg is within max_wal_size, even if + * max_slot_wal_keep_size is smaller than max_wal_size. + */ + if ((max_slot_wal_keep_size_mb <= 0 || + max_slot_wal_keep_size_mb >= max_wal_size_mb) && + oldestSegMaxWalSize <= targetSeg) + return WALAVAIL_NORMAL; + + /* being retained by slots */ + if (oldestSlotSeg <= targetSeg) + return WALAVAIL_RESERVED; + } + + /* Definitely lost */ + return WALAVAIL_REMOVED; +} + + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replication slots. * * This is calculated by subtracting wal_keep_segments from the given xlog * location, recptr and by making sure that that result is below the - * requirement of replication slots. + * requirement of replication slots. For the latter criterion we do consider + * the effects of max_slot_wal_keep_size: reserve at most that much space back + * from recptr. */ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) { + XLogSegNo currSegNo; XLogSegNo segno; XLogRecPtr keep; - XLByteToSeg(recptr, segno, wal_segment_size); - keep = XLogGetReplicationSlotMinimumLSN(); + XLByteToSeg(recptr, currSegNo, wal_segment_size); + segno = currSegNo; - /* compute limit for wal_keep_segments first */ - if (wal_keep_segments > 0) + /* + * Calculate how many segments are kept by slots first, adjusting for + * max_slot_wal_keep_size. + */ + keep = XLogGetReplicationSlotMinimumLSN(); + if (keep != InvalidXLogRecPtr) { - /* avoid underflow, don't go below 1 */ - if (segno <= wal_keep_segments) - segno = 1; - else - segno = segno - wal_keep_segments; + XLByteToSeg(keep, segno, wal_segment_size); + + /* Cap by max_slot_wal_keep_size ... */ + if (max_slot_wal_keep_size_mb >= 0) + { + XLogRecPtr slot_keep_segs; + + slot_keep_segs = + ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size); + + if (currSegNo - segno > slot_keep_segs) + segno = currSegNo - slot_keep_segs; + } } - /* then check whether slots limit removal further */ - if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) + /* but, keep at least wal_keep_segments if that's set */ + if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments) { - XLogSegNo slotSegNo; - - XLByteToSeg(keep, slotSegNo, wal_segment_size); - - if (slotSegNo <= 0) + /* avoid underflow, don't go below 1 */ + if (currSegNo <= wal_keep_segments) segno = 1; - else if (slotSegNo < segno) - segno = slotSegNo; + else + segno = currSegNo - wal_keep_segments; } /* don't delete WAL segments newer than the calculated segment */ - if (segno < *logSegNo) + if (XLogRecPtrIsInvalid(*logSegNo) || segno < *logSegNo) *logSegNo = segno; } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 813ea8bfc3..d406ea8118 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, - L.confirmed_flush_lsn + L.confirmed_flush_lsn, + L.wal_status, + L.min_safe_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 04510094a8..f5384f1df8 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name), true); + (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 47851ec4c1..abae74c9a5 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -325,9 +325,15 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* * Find a previously created slot and mark it as used by this backend. + * + * The return value is only useful if behavior is SAB_Inquire, in which + * it's zero if we successfully acquired the slot, or the PID of the + * owning process otherwise. If behavior is SAB_Error, then trying to + * acquire an owned slot is an error. If SAB_Block, we sleep until the + * slot is released by the owning process. */ -void -ReplicationSlotAcquire(const char *name, bool nowait) +int +ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) { ReplicationSlot *slot; int active_pid; @@ -392,11 +398,13 @@ retry: */ if (active_pid != MyProcPid) { - if (nowait) + if (behavior == SAB_Error) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", name, active_pid))); + else if (behavior == SAB_Inquire) + return active_pid; /* Wait here until we get signaled, and then restart */ ConditionVariableSleep(&slot->active_cv, @@ -412,6 +420,9 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; + + /* success */ + return 0; } /* @@ -518,7 +529,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); ReplicationSlotDropAcquired(); } @@ -743,6 +754,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) /* * Compute the oldest restart LSN across all slots and inform xlog module. + * + * Note: while max_slot_wal_keep_size is theoretically relevant for this + * purpose, we don't try to account for that, because this module doesn't + * know what to compare against. */ void ReplicationSlotsComputeRequiredLSN(void) @@ -818,6 +833,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); + if (restart_lsn == InvalidXLogRecPtr) + continue; + if (result == InvalidXLogRecPtr || restart_lsn < result) result = restart_lsn; @@ -1064,6 +1082,80 @@ ReplicationSlotReserveWal(void) } } +/* + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * + * NB - this runs as part of checkpoint, so avoid raising errors if possible. + */ +void +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +{ + XLogRecPtr oldestLSN; + + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + char *slotname; + + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); + if (s->data.restart_lsn == InvalidXLogRecPtr || + s->data.restart_lsn >= oldestLSN) + { + SpinLockRelease(&s->mutex); + continue; + } + + slotname = pstrdup(NameStr(s->data.name)); + restart_lsn = s->data.restart_lsn; + + SpinLockRelease(&s->mutex); + LWLockRelease(ReplicationSlotControlLock); + + for (;;) + { + int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire); + + /* no walsender? success! */ + if (wspid == 0) + break; + + ereport(LOG, + (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind", + wspid, slotname))); + (void) kill(wspid, SIGTERM); + + ConditionVariableTimedSleep(&s->active_cv, 10, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + ConditionVariableCancelSleep(); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + slotname, + (uint32) (restart_lsn >> 32), + (uint32) restart_lsn))); + + SpinLockAcquire(&s->mutex); + s->data.restart_lsn = InvalidXLogRecPtr; + SpinLockRelease(&s->mutex); + ReplicationSlotRelease(); + + /* if we did anything, start from scratch */ + CHECK_FOR_INTERRUPTS(); + goto restart; + } + LWLockRelease(ReplicationSlotControlLock); +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ce0c9127bc..f776de3df7 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 11 +#define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) Oid database; NameData slot_name; NameData plugin; + WALAvailability walstate; + XLogSegNo last_removed_seg; int i; if (!slot->in_use) @@ -355,6 +357,40 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + walstate = GetWALAvailability(restart_lsn); + + switch (walstate) + { + case WALAVAIL_INVALID_LSN: + nulls[i++] = true; + break; + + case WALAVAIL_NORMAL: + values[i++] = CStringGetTextDatum("normal"); + break; + + case WALAVAIL_RESERVED: + values[i++] = CStringGetTextDatum("reserved"); + break; + + case WALAVAIL_REMOVED: + values[i++] = CStringGetTextDatum("lost"); + break; + } + + if (max_slot_wal_keep_size_mb >= 0 && + (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) && + ((last_removed_seg = XLogGetLastRemovedSegno()) != 0)) + { + XLogRecPtr min_safe_lsn; + + XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0, + wal_segment_size, min_safe_lsn); + values[i++] = Int64GetDatum(min_safe_lsn); + } + else + nulls[i++] = true; + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); @@ -377,6 +413,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; XLogRecPtr retlsn = startlsn; + Assert(moveto != InvalidXLogRecPtr); + if (startlsn < moveto) { SpinLockAcquire(&MyReplicationSlot->mutex); @@ -414,6 +452,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ResourceOwner old_resowner = CurrentResourceOwner; XLogRecPtr retlsn; + Assert(moveto != InvalidXLogRecPtr); + PG_TRY(); { /* @@ -552,7 +592,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9e5611574c..06e8b79036 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -595,7 +595,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1132,7 +1132,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 03a22d71ac..5bdc02fce2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2784,6 +2784,19 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."), + gettext_noop("Replication slots will be marked as failed, and segments released " + "for deletion or recycling, if this much space is occupied by WAL " + "on disk."), + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + -1, -1, MAX_KILOBYTES, + NULL, NULL, NULL + }, + { {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING, gettext_noop("Sets the maximum time to wait for WAL replication."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 1ae8b77306..995b6ca155 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -289,6 +289,7 @@ #max_wal_senders = 10 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments; 0 disables +#max_slot_wal_keep_size = -1 # measured in bytes; -1 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 7412caa5f2..f60ed2d36c 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -108,6 +108,7 @@ extern int wal_segment_size; extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; +extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; @@ -255,6 +256,17 @@ typedef struct CheckpointStatsData extern CheckpointStatsData CheckpointStats; +/* + * GetWALAvailability return codes + */ +typedef enum WALAvailability +{ + WALAVAIL_INVALID_LSN, /* parameter error */ + WALAVAIL_NORMAL, /* WAL segment is within max_wal_size */ + WALAVAIL_RESERVED, /* WAL segment is reserved by a slot */ + WALAVAIL_REMOVED /* WAL segment has been removed */ +} WALAvailability; + struct XLogRecData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, @@ -305,6 +317,8 @@ extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(int flags); +extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn); +extern XLogRecPtr CalculateMaxmumSafeLSN(void); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 7830c02021..27381d7874 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202004072 +#define CATALOG_VERSION_NO 202004073 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c9902fa123..4bce3ad8de 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10065,9 +10065,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 3e95b019b3..149210bae4 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -36,6 +36,14 @@ typedef enum ReplicationSlotPersistency RS_TEMPORARY } ReplicationSlotPersistency; +/* For ReplicationSlotAcquire, q.v. */ +typedef enum SlotAcquireBehavior +{ + SAB_Error, + SAB_Block, + SAB_Inquire +} SlotAcquireBehavior; + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -184,7 +192,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name, bool nowait); +extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); @@ -198,6 +206,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); +extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl new file mode 100644 index 0000000000..d6bc77e2b5 --- /dev/null +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -0,0 +1,217 @@ +# Test for replication slot limit +# Ensure that max_slot_wal_keep_size limits the number of WAL files to +# be kept by replication slots. +use strict; +use warnings; + +use TestLib; +use PostgresNode; + +use File::Path qw(rmtree); +use Test::More tests => 13; +use Time::HiRes qw(usleep); + +$ENV{PGDATABASE} = 'postgres'; + +# Initialize master node, setting wal-segsize to 1MB +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']); +$node_master->append_conf('postgresql.conf', qq( +min_wal_size = 2MB +max_wal_size = 4MB +log_checkpoints = yes +)); +$node_master->start; +$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')"); + +# The slot state and remain should be null before the first connection +my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"'); + + +# Take backup +my $backup_name = 'my_backup'; +$node_master->backup($backup_name); + +# Create a standby linking to it using the replication slot +my $node_standby = get_new_node('standby_1'); +$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1); +$node_standby->append_conf('postgresql.conf', "primary_slot_name = 'rep1'"); + +$node_standby->start; + +# Wait until standby has replayed enough data +my $start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +# Stop standby +$node_standby->stop; + +# Preparation done, the slot is the state "normal" now +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|t", 'check the catching-up state'); + +# Advance WAL by five segments (= 5MB) on master +advance_wal($node_master, 1); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# The slot is always "safe" when fitting max_wal_size +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size'); + +advance_wal($node_master, 4); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# The slot is always "safe" when max_slot_wal_keep_size is not set +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|t", 'check that slot is working'); + +# The standby can reconnect to master +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + +# Set max_slot_wal_keep_size on master +my $max_slot_wal_keep_size_mb = 6; +$node_master->append_conf('postgresql.conf', qq( +max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB +)); +$node_master->reload; + +# The slot is in safe state. The distance from the min_safe_lsn should +# be as almost (max_slot_wal_keep_size - 1) times large as the segment +# size + +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|5120 kB", 'check that max_slot_wal_keep_size is working'); + +# Advance WAL again then checkpoint, reducing remain by 2 MB. +advance_wal($node_master, 2); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# The slot is still working +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|2048 kB", 'check that min_safe_lsn gets close to the current LSN'); + +# The standby can reconnect to master +$node_standby->start; +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); +$node_standby->stop; + +# wal_keep_segments overrides max_slot_wal_keep_size +$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();"); +# Advance WAL again then checkpoint, reducing remain by 6 MB. +advance_wal($node_master, 6); +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(pg_current_wal_lsn() - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|normal|8192 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size'); +# restore wal_keep_segments +$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();"); + +# The standby can reconnect to master +$node_standby->start; +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); +$node_standby->stop; + +# Advance WAL again without checkpoint, reducing remain by 6 MB. +advance_wal($node_master, 6); + +# Slot gets into 'reserved' state +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(restart_lsn - min_safe_lsn) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|reserved|216 bytes", 'check that the slot state changes to "reserved"'); + +# do checkpoint so that the next checkpoint runs too early +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# Advance WAL again without checkpoint; remain goes to 0. +advance_wal($node_master, 1); + +# Slot gets into 'lost' state +$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"'); + +# The standby still can connect to master before a checkpoint +$node_standby->start; + +$start_lsn = $node_master->lsn('write'); +$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn); + +$node_standby->stop; + +ok(!find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed"), + 'check that required WAL segments are still available'); + +# Advance WAL again, the slot loses the oldest segment. +my $logstart = get_log_size($node_master); +advance_wal($node_master, 7); +$node_master->safe_psql('postgres', "CHECKPOINT;"); + +# WARNING should be issued +ok(find_in_log($node_master, + "invalidating slot \"rep1\" because its restart_lsn [0-9A-F/]+ exceeds max_slot_wal_keep_size", + $logstart), + 'check that the warning is logged'); + +# This slot should be broken +$result = $node_master->safe_psql('postgres', "SELECT slot_name, active, restart_lsn, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"); +is($result, "rep1|f|||", 'check that the slot became inactive'); + +# The standby no longer can connect to the master +$logstart = get_log_size($node_standby); +$node_standby->start; + +my $failed = 0; +for (my $i = 0; $i < 10000; $i++) +{ + if (find_in_log($node_standby, + "requested WAL segment [0-9A-F]+ has already been removed", + $logstart)) + { + $failed = 1; + last; + } + usleep(100_000); +} +ok($failed, 'check that replication has been broken'); + +$node_standby->stop; + +##################################### +# Advance WAL of $node by $n segments +sub advance_wal +{ + my ($node, $n) = @_; + + # Advance by $n segments (= (16 * $n) MB) on master + for (my $i = 0 ; $i < $n ; $i++) + { + $node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();"); + } +} + +# return the size of logfile of $node in bytes +sub get_log_size +{ + my ($node) = @_; + + return (stat $node->logfile)[7]; +} + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = TestLib::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6eec8ec568..ac31840739 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name, l.xmin, l.catalog_xmin, l.restart_lsn, - l.confirmed_flush_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn) + l.confirmed_flush_lsn, + l.wal_status, + l.min_safe_lsn + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,