From 9e149c847f398793ec1641885434dcd10837d89d Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 1 Jun 2018 14:30:55 -0400 Subject: [PATCH] Fix and document lock handling for in-memory replication slot data While debugging issues on HEAD for the new slot forwarding feature of Postgres 11, some monitoring of the code surrounding in-memory slot data has proved that the lock handling may cause inconsistent data to be read by read-only callers of slot functions, particularly pg_get_replication_slots() which fetches data for the system view pg_replication_slots, or modules looking directly at slot information. The code paths involved in those problems concern logical decoding initialization (down to 9.4) and WAL reservation for slots (new as of 10). A set of comments documenting all the lock handlings, particularly the dependency with LW locks for slots and the in_use flag as well as the internal mutex lock is added, based on a suggested by Simon Riggs. Some of the fixed code exists down to 9.4 where WAL decoding has been introduced, but as those race conditions are really unlikely going to happen as those concern code paths for slot and decoding creation, just fix the problem on HEAD. Author: Michael Paquier Discussion: https://postgr.es/m/20180528085747.GA27845@paquier.xyz --- src/backend/replication/logical/logical.c | 13 +++++++++---- src/backend/replication/slot.c | 4 ++++ src/include/replication/slot.h | 13 +++++++++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1393591538..61588d626f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin, xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; if (need_full_snapshot) slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -445,13 +447,14 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecPtr startptr; + ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = ctx->slot->data.restart_lsn; + startptr = slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32) (ctx->slot->data.restart_lsn >> 32), - (uint32) ctx->slot->data.restart_lsn); + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } - ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockAcquire(&slot->mutex); + slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockRelease(&slot->mutex); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 76a88c6de7..7964ae254f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData /* * Shared memory state of a single replication slot. + * + * The in-memory data of replication slots follows a locking model based + * on two linked concepts: + * - A replication slot's in_use flag is switched when added or discarded using + * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive + * mode when updating the flag by the backend owning the slot and doing the + * operation, while readers (concurrent backends not owning the slot) need + * to hold it in shared mode when looking at replication slot data. + * - Individual fields are protected by mutex where only the backend owning + * the slot is authorized to update the fields from its own slot. The + * backend owning the slot does not need to take this lock when reading its + * own fields, while concurrent backends not owning this slot should take the + * lock when reading this slot's data. */ typedef struct ReplicationSlot {