diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..2806e1076c 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) /* * Helper function for advancing physical replication slot forward. + * The LSN position to move to is compared simply to the slot's + * restart_lsn, knowing that any position older than that would be + * removed by successive checkpoints. */ static XLogRecPtr -pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +pg_physical_replication_slot_advance(XLogRecPtr moveto) { - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; + XLogRecPtr retlsn = startlsn; - SpinLockAcquire(&MyReplicationSlot->mutex); - if (MyReplicationSlot->data.restart_lsn < moveto) + if (startlsn < moveto) { + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.restart_lsn = moveto; + SpinLockRelease(&MyReplicationSlot->mutex); retlsn = moveto; } - SpinLockRelease(&MyReplicationSlot->mutex); return retlsn; } /* * Helper function for advancing logical replication slot forward. + * The slot's restart_lsn is used as start point for reading records, + * while confirmed_lsn is used as base point for the decoding context. + * The LSN position to move to is checked by doing a per-record scan and + * logical decoding which makes sure that confirmed_lsn is updated to a + * LSN which allows the future slot consumer to get consistent logical + * changes. */ static XLogRecPtr -pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; + XLogRecPtr retlsn = MyReplicationSlot->data.confirmed_flush; PG_TRY(); { @@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* check limits */ + /* Stop once the moving point wanted by caller has been reached */ if (moveto <= ctx->reader->EndRecPtr) break; @@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) Name slotname = PG_GETARG_NAME(0); XLogRecPtr moveto = PG_GETARG_LSN(1); XLogRecPtr endlsn; - XLogRecPtr startlsn; + XLogRecPtr minlsn; TupleDesc tupdesc; Datum values[2]; bool nulls[2]; @@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); - startlsn = MyReplicationSlot->data.confirmed_flush; - if (moveto < startlsn) + /* + * Check if the slot is not moving backwards. Physical slots rely simply + * on restart_lsn as a minimum point, while logical slots have confirmed + * consumption up to confirmed_lsn, meaning that in both cases data older + * than that is not available anymore. + */ + if (OidIsValid(MyReplicationSlot->data.database)) + minlsn = MyReplicationSlot->data.confirmed_flush; + else + minlsn = MyReplicationSlot->data.restart_lsn; + + if (moveto < minlsn) { ReplicationSlotRelease(); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot move slot to %X/%X, minimum is %X/%X", (uint32) (moveto >> 32), (uint32) moveto, - (uint32) (startlsn >> 32), (uint32) startlsn))); + (uint32) (minlsn >> 32), (uint32) minlsn))); } + /* Do the actual slot update, depending on the slot type */ if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(startlsn, moveto); + endlsn = pg_logical_replication_slot_advance(moveto); else - endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + endlsn = pg_physical_replication_slot_advance(moveto); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false;