From e58e13e846953e0540b1a2c9a7739b666e134997 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Tue, 17 Mar 2020 16:13:18 -0300 Subject: [PATCH] Fix consistency issues with replication slot copy Commit 9f06d79ef831's replication slot copying failed to properly reserve the WAL that the slot is expecting to see during DecodingContextFindStartpoint (to set the confirmed_flush LSN), so concurrent activity could remove that WAL and cause the copy process to error out. But it doesn't actually *need* that WAL anyway: instead of running decode to find confirmed_flush, it can be copied from the source slot. Fix this by rearranging things to avoid DecodingContextFindStartpoint() (leaving the target slot's confirmed_flush_lsn to invalid), and set that up afterwards by copying from the target slot's value. Also ensure the source slot's confirmed_flush_lsn is valid. Reported-by: Arseny Sher Author: Masahiko Sawada, Arseny Sher Discussion: https://postgr.es/m/871rr3ohbo.fsf@ars-thinkpad --- src/backend/replication/logical/logical.c | 2 + src/backend/replication/slotfuncs.c | 46 +++++++++++++++++++---- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9853be6d1c..218dff0cfe 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -211,6 +211,8 @@ StartupDecodingContext(List *output_plugin_options, * * plugin -- contains the name of the output plugin * output_plugin_options -- contains options passed to the output plugin + * need_full_snapshot -- if true, must obtain a snapshot able to read all + * tables; if false, one that can read only catalogs is acceptable. * restart_lsn -- if given as invalid, it's this routine's responsibility to * mark WAL as reserved by setting a convenient restart_lsn for the slot. * Otherwise, we set for decoding to start from the given LSN without diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 3f5944f2ad..3a7cea54d5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) * Helper function for creating a new logical replication slot with * given arguments. Note that this function doesn't release the created * slot. + * + * When find_startpoint is false, the slot's confirmed_flush is not set; it's + * caller's responsibility to ensure it's set to something sensible. */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, XLogRecPtr restart_lsn, + bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* - * Create logical decoding context, to build the initial snapshot. + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. + * + * Note: when !find_startpoint this is still important, because it's at + * this point that the output plugin is validated. */ ctx = CreateInitDecodingContext(plugin, NIL, - false, /* do not build snapshot */ + false, /* just catalogs is OK */ restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); - /* build initial snapshot, might take a while */ - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so now. + * This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + true); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -691,10 +704,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Create new slot and acquire it */ if (logical_slot) + { + /* + * We must not try to read WAL, since we haven't reserved it yet -- + * hence pass find_startpoint false. confirmed_flush will be set + * below, by copying from the source slot. + */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, - src_restart_lsn); + src_restart_lsn, + false); + } else create_physical_replication_slot(NameStr(*dst_name), true, @@ -711,6 +732,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_xmin; TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; + XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; @@ -722,6 +744,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_xmin = src->data.xmin; copy_catalog_xmin = src->data.catalog_xmin; copy_restart_lsn = src->data.restart_lsn; + copy_confirmed_flush = src->data.confirmed_flush; /* for existence check */ copy_name = pstrdup(NameStr(src->data.name)); @@ -746,6 +769,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) NameStr(*src_name)), errdetail("The source replication slot was modified incompatibly during the copy operation."))); + /* The source slot must have a consistent snapshot */ + if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy unfinished logical replication slot \"%s\"", + NameStr(*src_name)), + errhint("Retry when the source replication slot's confirmed_flush_lsn is valid."))); + /* Install copied values again */ SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_xmin = copy_effective_xmin; @@ -754,6 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.xmin = copy_xmin; MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty();