diff --git a/contrib/test_decoding/expected/permissions.out b/contrib/test_decoding/expected/permissions.out index d6eaba8c55..8d100646ce 100644 --- a/contrib/test_decoding/expected/permissions.out +++ b/contrib/test_decoding/expected/permissions.out @@ -64,6 +64,9 @@ DETAIL: Only roles with the REPLICATION attribute may use replication slots. SELECT pg_drop_replication_slot('regression_slot'); ERROR: permission denied to use replication slots DETAIL: Only roles with the REPLICATION attribute may use replication slots. +SELECT pg_sync_replication_slots(); +ERROR: permission denied to use replication slots +DETAIL: Only roles with the REPLICATION attribute may use replication slots. RESET ROLE; -- replication users can drop superuser created slots SET ROLE regress_lr_superuser; diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 261d8886d3..349ab2d380 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -425,6 +425,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', ' init (1 row) +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true); +ERROR: cannot enable failover for a temporary replication slot SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); ?column? ---------- diff --git a/contrib/test_decoding/sql/permissions.sql b/contrib/test_decoding/sql/permissions.sql index 312b514593..94db936aee 100644 --- a/contrib/test_decoding/sql/permissions.sql +++ b/contrib/test_decoding/sql/permissions.sql @@ -29,6 +29,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d INSERT INTO lr_test VALUES('lr_superuser_init'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_sync_replication_slots(); RESET ROLE; -- replication users can drop superuser created slots diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 45aeae7fd5..580e3ae3be 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -181,6 +181,7 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true); SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); SELECT slot_name, slot_type, failover FROM pg_replication_slots; diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 61038472c5..037a3b8a64 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4612,8 +4612,13 @@ ANY num_sync ( ), + it is also necessary to specify a valid dbname + in the primary_conninfo string. This will only be + used for slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 11d537b341..8f147a2417 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28075,7 +28075,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset - + pg_create_logical_replication_slot @@ -28444,6 +28444,39 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset record is flushed along with its transaction. + + + + + pg_sync_replication_slots + + pg_sync_replication_slots () + void + + + Synchronize the logical failover replication slots from the primary + server to the standby server. This function can only be executed on the + standby server. Temporary synced slots, if any, cannot be used for + logical decoding and must be dropped after promotion. See + for details. + + + + + If, after executing the function, + + hot_standby_feedback is disabled on + the standby or the physical slot configured in + + primary_slot_name is + removed, then it is possible that the necessary rows of the + synchronized slot will be removed by the VACUUM process on the primary + server, resulting in the synchronized slot becoming invalidated. + + + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..eceaaaa273 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -358,6 +358,62 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU So if a slot is no longer required it should be dropped. + + + + + Replication Slot Synchronization + + The logical replication slots on the primary can be synchronized to + the hot standby by using the failover parameter of + + pg_create_logical_replication_slot, or by + using the + failover option of + CREATE SUBSCRIPTION during slot creation, and then calling + + pg_sync_replication_slots + on the standby. For the synchronization to work, it is mandatory to + have a physical replication slot between the primary and the standby aka + primary_slot_name + should be configured on the standby, and + hot_standby_feedback + must be enabled on the standby. It is also necessary to specify a valid + dbname in the + primary_conninfo. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.synced + value for the synchronized slots on the standby at the time of failover. + Only persistent slots that have attained synced state as true on the standby + before failover can be used for logical replication after failover. + Temporary synced slots cannot be used for logical decoding, therefore + logical replication for those slots cannot be resumed. For example, if the + synchronized slot could not become persistent on the standby due to a + disabled subscription, then the subscription cannot be resumed after + failover even when it is enabled. + + + + To resume logical replication after failover from the synced logical + slots, the subscription's 'conninfo' must be altered to point to the + new primary server. This is done using + ALTER SUBSCRIPTION ... CONNECTION. + It is recommended that subscriptions are first disabled before promoting + the standby and are re-enabled after altering the connection string. + + + + There is a chance that the old primary is up again during the promotion + and if subscriptions are not disabled, the logical subscribers may + continue to receive data from the old primary server even after promotion + until the connection string is altered. This might result in data + inconsistency issues, preventing the logical subscribers from being + able to continue replication from the new primary server. + + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 05d6cc42da..a5cb19357f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2062,7 +2062,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. The default is false. @@ -2162,7 +2163,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index dd468b31ea..be90edd0e2 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2561,10 +2561,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx failover bool - True if this is a logical slot enabled to be synced to the standbys. - Always false for physical slots. + True if this is a logical slot enabled to be synced to the standbys + so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + + + synced bool + + + True if this is a logical slot that was synced from a primary server. + On a hot standby, the slots with the synced column marked as true can + neither be used for logical decoding nor dropped manually. The value + of this column has no meaning on the primary server; the column value on + the primary is default false for all slots but may (if leftover from a + promoted standby) also be true. + + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6791bff9dd..04227a72d1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflict_reason, - L.failover + L.failover, + L.synced FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..a53815f2ed 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical decoding", + NameStr(slot->data.name)), + errdetail("This slot is being synchronized from the primary server."), + errhint("Specify another replication slot.")); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1050eb2c09..3dec36a6de 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..0aa1bf1ad2 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,906 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * Functionality for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot synchronization on a physical standby + * to fetch logical failover slots information from the primary server, create + * the slots on the standby and synchronize them. This is done by a call to SQL + * function pg_sync_replication_slots. + * + * If on physical standby, the WAL corresponding to the remote's restart_lsn + * is not available or the remote's catalog_xmin precedes the oldest xid for which + * it is guaranteed that rows wouldn't have been removed then we cannot create + * the local standby slot because that would mean moving the local slot + * backward and decoding won't be possible via such a slot. In this case, the + * slot will be marked as RS_TEMPORARY. Once the primary server catches up, + * the slot will be marked as RS_PERSISTENT (which means sync-ready) after + * which we can call pg_sync_replication_slots() periodically to perform + * syncs. + * + * Any standby synchronized slots will be dropped if they no longer need + * to be synchronized. See comment atop drop_local_obsolete_slots() for more + * details. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "replication/logical.h" +#include "replication/slotsync.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" + +/* Struct for sharing information to control slot synchronization. */ +typedef struct SlotSyncCtxStruct +{ + /* prevents concurrent slot syncs to avoid slot overwrites */ + bool syncing; + slock_t mutex; +} SlotSyncCtxStruct; + +SlotSyncCtxStruct *SlotSyncCtx = NULL; + +/* + * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag + * in SlotSyncCtxStruct, this flag is true only if the current process is + * performing slot synchronization. + */ +static bool syncing_slots = false; + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * If necessary, update the local synced slot's metadata based on the data + * from the remote slot. + * + * If no update was needed (the data of the remote slot is the same as the + * local slot) return false, otherwise true. + */ +static bool +update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + bool xmin_changed; + bool restart_lsn_changed; + NameData plugin_name; + + Assert(slot->data.invalidated == RS_INVAL_NONE); + + xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin); + restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn); + + if (!xmin_changed && + !restart_lsn_changed && + remote_dbid == slot->data.database && + remote_slot->two_phase == slot->data.two_phase && + remote_slot->failover == slot->data.failover && + remote_slot->confirmed_lsn == slot->data.confirmed_flush && + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0) + return false; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.plugin = plugin_name; + slot->data.database = remote_dbid; + slot->data.two_phase = remote_slot->two_phase; + slot->data.failover = remote_slot->failover; + slot->data.restart_lsn = remote_slot->restart_lsn; + slot->data.confirmed_flush = remote_slot->confirmed_lsn; + slot->data.catalog_xmin = remote_slot->catalog_xmin; + slot->effective_catalog_xmin = remote_slot->catalog_xmin; + SpinLockRelease(&slot->mutex); + + if (xmin_changed) + ReplicationSlotsComputeRequiredXmin(false); + + if (restart_lsn_changed) + ReplicationSlotsComputeRequiredLSN(); + + return true; +} + +/* + * Get the list of local logical slots that are synchronized from the + * primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is a synchronized slot */ + if (s->in_use && s->data.synced) + { + Assert(SlotIsLogical(s)); + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is required to be retained. + * + * Return false either if local_slot does not exist in the remote_slots list + * or is invalidated while the corresponding remote slot is still valid, + * otherwise true. + */ +static bool +local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots) +{ + bool remote_exists = false; + bool locally_invalidated = false; + + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + remote_exists = true; + + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set locally_invalidated flag. + */ + SpinLockAcquire(&local_slot->mutex); + locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + break; + } + } + + return (remote_exists && !locally_invalidated); +} + +/* + * Drop local obsolete slots. + * + * Drop the local slots that no longer need to be synced i.e. these either do + * not exist on the primary or are no longer enabled for failover. + * + * Additionally, drop any slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - 'primary_slot_name' is temporarily reset to null and the physical slot is + * removed. + * These dropped slots will get recreated in next sync-cycle and it is okay to + * drop and recreate such slots as long as these are not consumable on the + * standby (which is the case currently). + * + * Note: Change of 'wal_level' on the primary server to a level lower than + * logical may also result in slot invalidation and removal on the standby. + * This is because such 'wal_level' change is only possible if the logical + * slots are removed on the primary server, so it's expected to see the + * slots being invalidated and removed on the standby too (and re-created + * if they are re-created on the primary server). + */ +static void +drop_local_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = get_local_synced_slots(); + + foreach_ptr(ReplicationSlot, local_slot, local_slots) + { + /* Drop the local slot if it is not required to be retained. */ + if (!local_sync_slot_required(local_slot, remote_slot_list)) + { + bool synced_slot; + + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot + * during a drop-database operation. + */ + LockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + /* + * In the small window between getting the slot to drop and + * locking the database, there is a possibility of a parallel + * database drop by the startup process and the creation of a new + * slot by the user. This new user-created slot may end up using + * the same shared memory as that of 'local_slot'. Thus check if + * local_slot is still the synced one before performing actual + * drop. + */ + SpinLockAcquire(&local_slot->mutex); + synced_slot = local_slot->in_use && local_slot->data.synced; + SpinLockRelease(&local_slot->mutex); + + if (synced_slot) + { + ReplicationSlotAcquire(NameStr(local_slot->data.name), true); + ReplicationSlotDropAcquired(); + } + + UnlockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database)); + } + } +} + +/* + * Reserve WAL for the currently active local slot using the specified WAL + * location (restart_lsn). + * + * If the given WAL location has been removed, reserve WAL using the oldest + * existing WAL segment. + */ +static void +reserve_wal_for_local_slot(XLogRecPtr restart_lsn) +{ + XLogSegNo oldest_segno; + XLogSegNo segno; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn)); + + while (true) + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* Prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + + /* + * Find the oldest existing WAL segment file. + * + * Normally, we can determine it by using the last removed segment + * number. However, if no WAL segment files have been removed by a + * checkpoint since startup, we need to search for the oldest segment + * file from the current timeline existing in XLOGDIR. + * + * XXX: Currently, we are searching for the oldest segment in the + * current timeline as there is less chance of the slot's restart_lsn + * from being some prior timeline, and even if it happens, in the + * worst case, we will wait to sync till the slot's restart_lsn moved + * to the current timeline. + */ + oldest_segno = XLogGetLastRemovedSegno() + 1; + + if (oldest_segno == 1) + { + TimeLineID cur_timeline; + + GetWalRcvFlushRecPtr(NULL, &cur_timeline); + oldest_segno = XLogGetOldestSegno(cur_timeline); + } + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + if (segno >= oldest_segno) + break; + + /* Retry using the location of the oldest wal segment */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); + } +} + +/* + * If the remote restart_lsn and catalog_xmin have caught up with the + * local ones, then update the LSNs and persist the local synced slot for + * future synchronization; otherwise, do nothing. + */ +static void +update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + + /* + * Check if the primary server has caught up. Refer to the comment atop + * the file for details on this check. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + /* + * The remote slot didn't catch up to locally reserved position. + * + * We do not drop the slot because the restart_lsn can be ahead of the + * current location when recreating the slot in the next cycle. It may + * take more time to create such a slot. Therefore, we keep this slot + * and attempt the synchronization in the next cycle. + */ + return; + } + + /* First time slot update, the function must return true */ + if (!update_local_synced_slot(remote_slot, remote_dbid)) + elog(ERROR, "failed to update slot"); + + ReplicationSlotPersist(); + + ereport(LOG, + errmsg("newly created slot \"%s\" is sync-ready now", + remote_slot->name)); +} + +/* + * Synchronize a single slot to the given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The slot is created as a temporary slot and stays in the same state until the + * the remote_slot catches up with locally reserved position and local slot is + * updated. The slot is then persisted and is considered as sync-ready for + * periodic syncs. + */ +static void +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot; + XLogRecPtr latestFlushPtr; + + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + */ + latestFlushPtr = GetStandbyFlushRecPtr(NULL); + if (remote_slot->confirmed_lsn > latestFlushPtr) + elog(ERROR, + "skipping slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr)); + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + bool synced; + + SpinLockAcquire(&slot->mutex); + synced = slot->data.synced; + SpinLockRelease(&slot->mutex); + + /* User-created slot with the same name exists, raise ERROR. */ + if (!synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + + /* + * The slot has been synchronized before. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE && + remote_slot->invalidated != RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + update_and_persist_local_synced_slot(remote_slot, remote_dbid); + } + + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: As long as the invalidations are handled + * appropriately as above, this should never happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "cannot synchronize local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization" + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + /* Make sure the slot changes persist across server restart */ + if (update_local_synced_slot(remote_slot, remote_dbid)) + { + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + } + } + /* Otherwise create the slot first. */ + else + { + NameData plugin_name; + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return; + + /* + * We create temporary slots instead of ephemeral slots here because + * we want the slots to survive after releasing them. This is done to + * avoid dropping and re-creating the slots in each synchronization + * cycle if the restart_lsn or catalog_xmin of the remote slot has not + * caught up. + */ + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, + remote_slot->two_phase, + remote_slot->failover, + true); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.database = remote_dbid; + slot->data.plugin = plugin_name; + SpinLockRelease(&slot->mutex); + + reserve_wal_for_local_slot(remote_slot->restart_lsn); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + update_and_persist_local_synced_slot(remote_slot, remote_dbid); + } + + ReplicationSlotRelease(); +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + */ +static void +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + + SpinLockAcquire(&SlotSyncCtx->mutex); + if (SlotSyncCtx->syncing) + { + SpinLockRelease(&SlotSyncCtx->mutex); + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot synchronize replication slots concurrently")); + } + + SlotSyncCtx->syncing = true; + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = true; + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, conflict_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err)); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + Datum d; + int col = 0; + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : + DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->catalog_xmin = isnull ? InvalidTransactionId : + DatumGetTransactionId(d); + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + ++col, &isnull)); + Assert(!isnull); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->invalidated = isnull ? RS_INVAL_NONE : + GetSlotInvalidationCause(TextDatumGetCString(d)); + + /* Sanity check */ + Assert(col == SLOTSYNC_COLUMN_COUNT); + + /* + * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the + * slot is valid, that means we have fetched the remote_slot in its + * RS_EPHEMERAL state. In such a case, don't sync it; we can always + * sync it in the next sync cycle when the remote_slot is persisted + * and has valid lsn(s) and xmin values. + * + * XXX: In future, if we plan to expose 'slot->data.persistency' in + * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL + * slots in the first place. + */ + if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + !TransactionIdIsValid(remote_slot->catalog_xmin)) && + remote_slot->invalidated == RS_INVAL_NONE) + pfree(remote_slot); + else + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_local_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach_ptr(RemoteSlot, remote_slot, remote_slot_list) + { + Oid remote_dbid = get_database_oid(remote_slot->database, false); + + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot during + * a drop-database operation. + */ + LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + + synchronize_one_slot(remote_slot, remote_dbid); + + UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->syncing = false; + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = false; +} + +/* + * Checks the remote server info. + * + * We ensure that the 'primary_slot_name' exists on the remote server and the + * remote server is not a standby node. + */ +static void +validate_remote_info(WalReceiverConn *wrconn) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool remote_in_recovery; + bool primary_slot_valid; + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_is_in_recovery(), count(*) = 1" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s", + PrimarySlotName, res->err), + errhint("Check if \"primary_slot_name\" is configured correctly.")); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + elog(ERROR, + "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); + + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot synchronize replication slots from a standby server")); + + primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!primary_slot_valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + /* translator: second %s is a GUC variable name */ + errdetail("The replication slot \"%s\" specified by \"%s\" does not exist on the primary server.", + PrimarySlotName, "primary_slot_name")); + + ExecClearTuple(tupslot); + walrcv_clear_result(res); +} + +/* + * Check all necessary GUCs for slot synchronization are set + * appropriately, otherwise, raise ERROR. + */ +void +ValidateSlotSyncParams(void) +{ + char *dbname; + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || *PrimarySlotName == '\0') + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be defined.", "primary_slot_name")); + + /* + * hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be enabled.", "hot_standby_feedback")); + + /* Logical slot sync/creation requires wal_level >= logical. */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"wal_level\" must be >= logical.")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0') + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("\"%s\" must be defined.", "primary_conninfo")); + + /* + * The slot synchronization needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + ereport(ERROR, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); +} + +/* + * Is current process syncing replication slots ? + */ +bool +IsSyncingReplicationSlots(void) +{ + return syncing_slots; +} + +/* + * Amount of shared memory required for slot synchronization. + */ +Size +SlotSyncShmemSize(void) +{ + return sizeof(SlotSyncCtxStruct); +} + +/* + * Allocate and initialize the shared memory of slot synchronization. + */ +void +SlotSyncShmemInit(void) +{ + bool found; + + SlotSyncCtx = (SlotSyncCtxStruct *) + ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found); + + if (!found) + { + SlotSyncCtx->syncing = false; + SpinLockInit(&SlotSyncCtx->mutex); + } +} + +/* + * Error cleanup callback for slot synchronization. + */ +static void +slotsync_failure_callback(int code, Datum arg) +{ + WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg); + + if (syncing_slots) + { + /* + * If syncing_slots is true, it indicates that the process errored out + * without resetting the flag. So, we need to clean up shared memory + * and reset the flag here. + */ + SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->syncing = false; + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = false; + } + + walrcv_disconnect(wrconn); +} + +/* + * Synchronize the failover enabled replication slots using the specified + * primary server connection. + */ +void +SyncReplicationSlots(WalReceiverConn *wrconn) +{ + PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); + { + validate_remote_info(wrconn); + + synchronize_slots(wrconn); + } + PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fd4e96c9d6..2180a38063 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slotsync.h" #include "replication/slot.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -90,7 +91,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 4 /* version for new files */ +#define SLOT_VERSION 5 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -103,7 +104,6 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ static void ReplicationSlotShmemExit(int code, Datum arg); -static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -250,11 +250,12 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. + * synced: True if the slot is synchronized from the primary server. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -263,6 +264,34 @@ ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotValidateName(name, ERROR); + if (failover) + { + /* + * Do not allow users to create the failover enabled slots on the + * standby as we do not support sync to the cascading standby. + * + * However, failover enabled slots can be created during slot + * synchronization because we need to retain the same values as the + * remote slot. + */ + if (RecoveryInProgress() && !IsSyncingReplicationSlots()) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot created on the standby")); + + /* + * Do not allow users to create failover enabled temporary slots, + * because temporary slots will not be synced to the standby. + * + * However, failover enabled temporary slots can be created during + * slot synchronization. See the comments atop slotsync.c for details. + */ + if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots()) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a temporary replication slot")); + } + /* * If some other backend ran this code concurrently with us, we'd likely * both allocate the same slot, and that would be bad. We'd also be at @@ -315,6 +344,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.synced = synced; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -677,6 +707,16 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + ReplicationSlotDropAcquired(); } @@ -696,6 +736,38 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + if (RecoveryInProgress()) + { + /* + * Do not allow users to alter the slots which are currently being + * synced from the primary to the standby. + */ + if (MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + + /* + * Do not allow users to enable failover on the standby as we do not + * support sync to the cascading standby. + */ + if (failover) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot" + " on the standby")); + } + + /* + * Do not allow users to enable failover for temporary slots as we do not + * support syncing temporary slots to the standby. + */ + if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a temporary replication slot")); + if (MyReplicationSlot->data.failover != failover) { SpinLockAcquire(&MyReplicationSlot->mutex); @@ -712,7 +784,7 @@ ReplicationSlotAlter(const char *name, bool failover) /* * Permanently drop the currently acquired replication slot. */ -static void +void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; @@ -868,8 +940,8 @@ ReplicationSlotMarkDirty(void) } /* - * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot, - * guaranteeing it will be there after an eventual crash. + * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a + * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash. */ void ReplicationSlotPersist(void) @@ -2189,3 +2261,25 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * Maps the pg_replication_slots.conflict_reason text value to + * ReplicationSlotInvalidationCause enum value + */ +ReplicationSlotInvalidationCause +GetSlotInvalidationCause(char *conflict_reason) +{ + Assert(conflict_reason); + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0) + return RS_INVAL_WAL_REMOVED; + else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0) + return RS_INVAL_HORIZON; + else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0) + return RS_INVAL_WAL_LEVEL; + else + Assert(0); + + /* Keep compiler quiet */ + return RS_INVAL_NONE; +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index eb685089b3..d2fa5e669a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,7 +21,9 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/inval.h" #include "utils/pg_lsn.h" #include "utils/resowner.h" @@ -43,7 +45,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, false); if (immediately_reserve) { @@ -136,7 +138,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, false); /* * Create logical decoding context to find start point or, if we don't @@ -237,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -418,21 +420,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) break; case RS_INVAL_WAL_REMOVED: - values[i++] = CStringGetTextDatum("wal_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); break; case RS_INVAL_HORIZON: - values[i++] = CStringGetTextDatum("rows_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); break; case RS_INVAL_WAL_LEVEL: - values[i++] = CStringGetTextDatum("wal_level_insufficient"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); break; } } values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -700,7 +704,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; - bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -756,7 +759,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); - failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -791,12 +793,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * We must not try to read WAL, since we haven't reserved it yet -- * hence pass find_startpoint false. confirmed_flush will be set * below, by copying from the source slot. + * + * To avoid potential issues with the slot synchronization where the + * restart_lsn of a replication slot can go backward, we set the + * failover option to false here. This situation occurs when a slot + * on the primary server is dropped and immediately replaced with a + * new slot of the same name, created by copying from another existing + * slot. However, the slot synchronization will only observe the + * restart_lsn of the same slot going backward. */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, false, - failover, + false, src_restart_lsn, false); } @@ -943,3 +953,49 @@ pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS) { return copy_replication_slot(fcinfo, false); } + +/* + * Synchronize failover enabled replication slots to a standby server + * from the primary server. + */ +Datum +pg_sync_replication_slots(PG_FUNCTION_ARGS) +{ + WalReceiverConn *wrconn; + char *err; + StringInfoData app_name; + + CheckSlotPermissions(); + + if (!RecoveryInProgress()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slots can only be synchronized to a standby server")); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + ValidateSlotSyncParams(); + + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_slotsync", cluster_name); + else + appendStringInfoString(&app_name, "slotsync"); + + /* Connect to the primary server. */ + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, &err); + pfree(app_name.data); + + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + SyncReplicationSlots(wrconn); + + walrcv_disconnect(wrconn); + + PG_RETURN_VOID(); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 146826d5db..4e54779a9e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -72,6 +72,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/slotsync.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -243,7 +244,6 @@ static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); -static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); static void IdentifySystem(void); static void UploadManifest(void); static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, @@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, false); if (reserve_wal) { @@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, false); /* * Do options check early so that we can bail before calling the @@ -3385,14 +3385,17 @@ WalSndDone(WalSndSendDataCallback send_data) } /* - * Returns the latest point in WAL that has been safely flushed to disk, and - * can be sent to the standby. This should only be called when in recovery, - * ie. we're streaming to a cascaded standby. + * Returns the latest point in WAL that has been safely flushed to disk. + * This should only be called when in recovery. + * + * This is called either by cascading walsender to find WAL postion to be sent + * to a cascaded standby or by slot synchronization function to validate remote + * slot's lsn before syncing it locally. * * As a side-effect, *tli is updated to the TLI of the last * replayed WAL record. */ -static XLogRecPtr +XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli) { XLogRecPtr replayPtr; @@ -3401,6 +3404,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) TimeLineID receiveTLI; XLogRecPtr result; + Assert(am_cascading_walsender || IsSyncingReplicationSlots()); + /* * We can safely send what's already been replayed. Also, if walreceiver * is streaming WAL from the same timeline, we can send anything that it diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 7084e18861..7e7941d625 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -36,6 +36,7 @@ #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -153,6 +154,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, StatsShmemSize()); size = add_size(size, WaitEventExtensionShmemSize()); size = add_size(size, InjectionPointShmemSize()); + size = add_size(size, SlotSyncShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -347,6 +349,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 9fc8ac9290..75e1fc8433 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202401301 +#define CATALOG_VERSION_NO 202402141 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 29af4ce65d..9c120fc2b7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11127,9 +11127,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', @@ -11212,6 +11212,10 @@ proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool', prosrc => 'pg_logical_emit_message_bytea' }, +{ oid => '9929', descr => 'sync replication slots from the primary to the standby', + proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => '', + prosrc => 'pg_sync_replication_slots' }, # event triggers { oid => '3566', descr => 'list objects dropped by the current command', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index da4c776492..e706ca834c 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Was this slot synchronized from the primary server? + */ + char synced; + /* * Is this a failover slot (sync candidate for standbys)? Only relevant * for logical slots on the primary server. @@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); @@ -259,5 +274,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern ReplicationSlotInvalidationCause + GetSlotInvalidationCause(char *conflict_reason); #endif /* SLOT_H */ diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h new file mode 100644 index 0000000000..e86d8a47b8 --- /dev/null +++ b/src/include/replication/slotsync.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * slotsync.h + * Exports for slot synchronization. + * + * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group + * + * src/include/replication/slotsync.h + * + *------------------------------------------------------------------------- + */ +#ifndef SLOTSYNC_H +#define SLOTSYNC_H + +#include "replication/walreceiver.h" + +extern void ValidateSlotSyncParams(void); +extern bool IsSyncingReplicationSlots(void); +extern Size SlotSyncShmemSize(void); +extern void SlotSyncShmemInit(void); +extern void SyncReplicationSlots(WalReceiverConn *wrconn); + +#endif /* SLOTSYNC_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1b58d50b3b..0c3996e926 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -12,6 +12,8 @@ #ifndef _WALSENDER_H #define _WALSENDER_H +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -37,6 +39,7 @@ extern void InitWalSender(void); extern bool exec_replication_command(const char *cmd_string); extern void WalSndErrorCleanup(void); extern void WalSndResourceCleanup(bool isCommit); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index bc58ff4cab..c96515d178 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -97,4 +97,241 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres', ok( $stderr =~ /ERROR: cannot set failover for enabled subscription/, "altering failover is not allowed for enabled subscription"); +################################################## +# Test that pg_sync_replication_slots() cannot be executed on a non-standby server. +################################################## + +($result, $stdout, $stderr) = + $publisher->psql('postgres', "SELECT pg_sync_replication_slots();"); +ok( $stderr =~ + /ERROR: replication slots can only be synchronized to a standby server/, + "cannot sync slots on a non-standby server"); + +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication) +# failover slot lsub2_slot | inactive +# primary ---> | +# physical slot sb1_slot --->| ----> standby1 (connected via streaming replication) +# | lsub1_slot, lsub2_slot (synced_slot) +################################################## + +my $primary = $publisher; +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby1->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->psql('postgres', + q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);} +); + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +# Start the standby so that slot syncing can begin +$standby1->start; + +$primary->wait_for_catchup('regress_mysub1'); + +# Do not allow any further advancement of the restart_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'", + 1); + +# Wait for the standby to catch up so that the standby is not lagging behind +# the subscriber. +$primary->wait_for_replay_catchup($standby1); + +# Synchronize the primary server slots to the standby. +$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Confirm that the logical failover slots are created on the standby and are +# flagged as 'synced' +is( $standby1->safe_psql( + 'postgres', + q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced;} + ), + "t", + 'logical slots have synced as true on standby'); + +################################################## +# Test that the synchronized slot will be dropped if the corresponding remote +# slot on the primary server has been dropped. +################################################## + +$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');"); + +$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +is( $standby1->safe_psql( + 'postgres', + q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';} + ), + "t", + 'synchronized slot has been dropped'); + +################################################## +# Test that if the synchronized slot is invalidated while the remote slot is +# still valid, the slot will be dropped and re-created on the standby by +# executing pg_sync_replication_slots() again. +################################################## + +# Configure the max_slot_wal_keep_size so that the synced slot can be +# invalidated due to wal removal. +$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB'); +$standby1->reload; + +# Generate some activity and switch WAL file on the primary +$primary->advance_wal(1); +$primary->psql('postgres', "CHECKPOINT"); +$primary->wait_for_replay_catchup($standby1); + +# Request a checkpoint on the standby to trigger the WAL file(s) removal +$standby1->safe_psql('postgres', "CHECKPOINT"); + +# Check if the synced slot is invalidated +is( $standby1->safe_psql( + 'postgres', + q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t", + 'synchronized slot has been invalidated'); + +# Reset max_slot_wal_keep_size to avoid further wal removal +$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1'); +$standby1->reload; + +# Enable the subscription to let it catch up to the latest wal position +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +$primary->wait_for_catchup('regress_mysub1'); + +# Do not allow any further advancement of the restart_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'", + 1); + +# Wait for the standby to catch up so that the standby is not lagging behind +# the subscriber. +$primary->wait_for_replay_catchup($standby1); + +my $log_offset = -s $standby1->logfile; + +# Synchronize the primary server slots to the standby. +$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Confirm that the invalidated slot has been dropped. +$standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/, + $log_offset); + +# Confirm that the logical slot has been re-created on the standby and is +# flagged as 'synced' +is( $standby1->safe_psql( + 'postgres', + q{SELECT conflict_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t", + 'logical slot is re-synced'); + +################################################## +# Test that a synchronized slot can not be decoded, altered or dropped by the +# user +################################################## + +# Attempting to perform logical decoding on a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);"); +ok( $stderr =~ + /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +# Attempting to alter a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Attempting to drop a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +################################################## +# Test that we cannot synchronize slots if dbname is not specified in the +# primary_conninfo. +################################################## + +$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'"); +$standby1->reload; + +($result, $stdout, $stderr) = + $standby1->psql('postgres', "SELECT pg_sync_replication_slots();"); +ok( $stderr =~ + /HINT: 'dbname' must be specified in "primary_conninfo"/, + "cannot sync slots if dbname is not specified in primary_conninfo"); + +################################################## +# Test that we cannot synchronize slots to a cascading standby server. +################################################## + +# Create a cascading standby +$backup_name = 'backup2'; +$standby1->backup($backup_name); + +my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); +$cascading_standby->init_from_backup( + $standby1, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $cascading_connstr = $standby1->connstr; +$cascading_standby->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'cascading_sb_slot' +primary_conninfo = '$cascading_connstr dbname=postgres' +)); + +$standby1->psql('postgres', + q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');}); + +$cascading_standby->start; + +($result, $stdout, $stderr) = + $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();"); +ok( $stderr =~ + /ERROR: cannot synchronize replication slots from a standby server/, + "cannot sync slots to a cascading standby server"); + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index abc944e8b8..b7488d760e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflict_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) + l.failover, + l.synced + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 91433d439b..d808aad8b0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2325,6 +2325,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2584,6 +2585,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncCtxStruct SlruCtl SlruCtlData SlruErrorCause