diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1393591538..61588d626f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin, xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; if (need_full_snapshot) slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -445,13 +447,14 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecPtr startptr; + ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = ctx->slot->data.restart_lsn; + startptr = slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32) (ctx->slot->data.restart_lsn >> 32), - (uint32) ctx->slot->data.restart_lsn); + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } - ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockAcquire(&slot->mutex); + slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockRelease(&slot->mutex); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 76a88c6de7..7964ae254f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData /* * Shared memory state of a single replication slot. + * + * The in-memory data of replication slots follows a locking model based + * on two linked concepts: + * - A replication slot's in_use flag is switched when added or discarded using + * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive + * mode when updating the flag by the backend owning the slot and doing the + * operation, while readers (concurrent backends not owning the slot) need + * to hold it in shared mode when looking at replication slot data. + * - Individual fields are protected by mutex where only the backend owning + * the slot is authorized to update the fields from its own slot. The + * backend owning the slot does not need to take this lock when reading its + * own fields, while concurrent backends not owning this slot should take the + * lock when reading this slot's data. */ typedef struct ReplicationSlot {