From 3741f2a09d5205ec32bd8af5d1f397e08995932b Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 12 Apr 2024 15:03:28 +0530 Subject: [PATCH] Fix the review comments and a bug in the slot sync code. Ensure that when updating the catalog_xmin of the synced slots, it is first written to disk before changing the in-memory value (effective_catalog_xmin). This is to prevent a scenario where the in-memory value change triggers a vacuum to remove catalog tuples before the catalog_xmin is written to disk. In the event of a crash before the catalog_xmin is persisted, we would not know that some required catalog tuples have been removed and the synced slot would be invalidated. Change the sanity check to ensure that remote_slot's confirmed_flush LSN can't precede the local/synced slot during slot sync. Note that the restart_lsn of the synced/local slot can be ahead of remote_slot. This can happen when slot advancing machinery finds a running xacts record after reaching the consistent state at a later point than the primary where it serializes the snapshot and updates the restart_lsn. Make the check to sync slots robust by allowing to sync only when the confirmed_lsn, restart_lsn, or catalog_xmin of the remote slot is ahead of the synced/local slot. Reported-by: Amit Kapila and Shveta Malik Author: Hou Zhijie, Shveta Malik Reviewed-by: Amit Kapila, Bertrand Drouvot Discussion: https://postgr.es/m/OS0PR01MB57162B67D3CB01B2756FBA6D94062@OS0PR01MB5716.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/CAJpy0uCSS5zmdyUXhvw41HSdTbRqX1hbYqkOfHNj7qQ+2zn0AQ@mail.gmail.com --- src/backend/replication/logical/slotsync.c | 161 +++++++++++++++------ 1 file changed, 118 insertions(+), 43 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 97440cb6bf..bda0de52db 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void); * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is * modified, and decoding from the corresponding LSN's can reach a * consistent snapshot. + * + * *remote_slot_precedes will be true if the remote slot's LSN or xmin + * precedes locally reserved position. */ static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, - bool *found_consistent_snapshot) + bool *found_consistent_snapshot, + bool *remote_slot_precedes) { ReplicationSlot *slot = MyReplicationSlot; - bool slot_updated = false; + bool updated_xmin_or_lsn = false; + bool updated_config = false; Assert(slot->data.invalidated == RS_INVAL_NONE); if (found_consistent_snapshot) *found_consistent_snapshot = false; - if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || - remote_slot->restart_lsn != slot->data.restart_lsn || - remote_slot->catalog_xmin != slot->data.catalog_xmin) + if (remote_slot_precedes) + *remote_slot_precedes = false; + + /* + * Don't overwrite if we already have a newer catalog_xmin and + * restart_lsn. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + /* + * This can happen in following situations: + * + * If the slot is temporary, it means either the initial WAL location + * reserved for the local slot is ahead of the remote slot's + * restart_lsn or the initial xmin_horizon computed for the local slot + * is ahead of the remote slot. + * + * If the slot is persistent, restart_lsn of the synced slot could + * still be ahead of the remote slot. Since we use slot advance + * functionality to keep snapbuild/slot updated, it is possible that + * the restart_lsn is advanced to a later position than it has on the + * primary. This can happen when slot advancing machinery finds + * running xacts record after reaching the consistent state at a later + * point than the primary where it serializes the snapshot and updates + * the restart_lsn. + * + * We LOG the message if the slot is temporary as it can help the user + * to understand why the slot is not sync-ready. In the case of a + * persistent slot, it would be a more common case and won't directly + * impact the users, so we used DEBUG1 level to log the message. + */ + ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, + errmsg("could not sync slot \"%s\" as remote slot precedes local slot", + remote_slot->name), + errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.", + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + slot->data.catalog_xmin)); + + if (remote_slot_precedes) + *remote_slot_precedes = true; + } + + /* + * Attempt to sync LSNs and xmins only if remote slot is ahead of local + * slot. + */ + else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush || + remote_slot->restart_lsn > slot->data.restart_lsn || + TransactionIdFollows(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) { /* * We can't directly copy the remote slot's LSN or xmin unless there @@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.restart_lsn = remote_slot->restart_lsn; slot->data.confirmed_flush = remote_slot->confirmed_lsn; slot->data.catalog_xmin = remote_slot->catalog_xmin; - slot->effective_catalog_xmin = remote_slot->catalog_xmin; SpinLockRelease(&slot->mutex); if (found_consistent_snapshot) @@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, { LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn, found_consistent_snapshot); + + /* Sanity check */ + if (slot->data.confirmed_flush != remote_slot->confirmed_lsn) + ereport(ERROR, + errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot", + remote_slot->name), + errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(slot->data.confirmed_flush))); } - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); - - slot_updated = true; + updated_xmin_or_lsn = true; } if (remote_dbid != slot->data.database || @@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.failover = remote_slot->failover; SpinLockRelease(&slot->mutex); - slot_updated = true; + updated_config = true; } - return slot_updated; + /* + * We have to write the changed xmin to disk *before* we change the + * in-memory value, otherwise after a crash we wouldn't know that some + * catalog tuples might have been removed already. + */ + if (updated_config || updated_xmin_or_lsn) + { + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only advance + * xmin here and there's not much harm done by a concurrent computation + * missing that. + */ + if (updated_xmin_or_lsn) + { + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = remote_slot->catalog_xmin; + SpinLockRelease(&slot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + + return updated_config || updated_xmin_or_lsn; } /* @@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; + bool remote_slot_precedes = false; + + (void) update_local_synced_slot(remote_slot, remote_dbid, + &found_consistent_snapshot, + &remote_slot_precedes); /* * Check if the primary server has caught up. Refer to the comment atop * the file for details on this check. */ - if (remote_slot->restart_lsn < slot->data.restart_lsn || - TransactionIdPrecedes(remote_slot->catalog_xmin, - slot->data.catalog_xmin)) + if (remote_slot_precedes) { /* * The remote slot didn't catch up to locally reserved position. @@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * current location when recreating the slot in the next cycle. It may * take more time to create such a slot. Therefore, we keep this slot * and attempt the synchronization in the next cycle. - * - * XXX should this be changed to elog(DEBUG1) perhaps? */ - ereport(LOG, - errmsg("could not sync slot \"%s\" as remote slot precedes local slot", - remote_slot->name), - errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.", - LSN_FORMAT_ARGS(remote_slot->restart_lsn), - remote_slot->catalog_xmin, - LSN_FORMAT_ARGS(slot->data.restart_lsn), - slot->data.catalog_xmin)); return false; } - (void) update_local_synced_slot(remote_slot, remote_dbid, - &found_consistent_snapshot); - /* * Don't persist the slot if it cannot reach the consistent point from the * restart_lsn. See comments atop this file. @@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* * Sanity check: As long as the invalidations are handled * appropriately as above, this should never happen. + * + * We don't need to check restart_lsn here. See the comments in + * update_local_synced_slot() for details. */ - if (remote_slot->restart_lsn < slot->data.restart_lsn) - elog(ERROR, - "cannot synchronize local slot \"%s\" LSN(%X/%X)" - " to remote slot's LSN(%X/%X) as synchronization" - " would move it backwards", remote_slot->name, - LSN_FORMAT_ARGS(slot->data.restart_lsn), - LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) + ereport(ERROR, + errmsg_internal("cannot synchronize local slot \"%s\"", + remote_slot->name), + errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).", + LSN_FORMAT_ARGS(slot->data.confirmed_flush), + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); - /* Make sure the slot changes persist across server restart */ - if (update_local_synced_slot(remote_slot, remote_dbid, NULL)) - { - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - - slot_updated = true; - } + slot_updated = update_local_synced_slot(remote_slot, remote_dbid, + NULL, NULL); } } /* Otherwise create the slot first. */