diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 523621a705..3da6b0be63 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -150,3 +150,237 @@ SELECT pg_drop_replication_slot('regression_slot3'); (1 row) +-- +-- Test copy functions for logical replication slots +-- +-- Create and copy logical slots +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | plugin | temporary | slot_name | plugin | temporary +------------+---------------+-----------+---------------------------------+---------------+----------- + orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f + orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t + orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f +(3 rows) + +-- Now we have maximum 4 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: all replication slots are in use +HINT: Free one or increase max_replication_slots. +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- Test based on the temporary logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput'); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | plugin | temporary | slot_name | plugin | temporary +------------+---------------+-----------+---------------------------------+---------------+----------- + orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t + orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f + orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t +(3 rows) + +-- Cannot copy a logical slot to a physical slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error +ERROR: cannot copy physical replication slot "orig_slot2" as a logical replication slot +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- +-- Test copy functions for physical replication slots +-- +-- Create and copy physical slots +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + slot_name | slot_type | temporary +------------------------+-----------+----------- + orig_slot1 | physical | f + orig_slot2 | physical | f + copied_slot1_no_change | physical | f + copied_slot1_temp | physical | t +(4 rows) + +-- Cannot copy a physical slot to a logical slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: cannot copy logical replication slot "orig_slot1" as a physical replication slot +-- Cannot copy a physical slot that doesn't reserve WAL +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error +ERROR: cannot copy a replication slot that doesn't reserve WAL +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('orig_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- Test based on the temporary physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.temporary, c.slot_name, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | temporary | slot_name | temporary +------------+-----------+------------------------+----------- + orig_slot2 | t | copied_slot2_no_change | t + orig_slot2 | t | copied_slot2_notemp | f +(2 rows) + +SELECT pg_drop_replication_slot('orig_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_notemp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index c8d08f8541..6d83fb2678 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -76,3 +76,97 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error SELECT pg_drop_replication_slot('regression_slot3'); + +-- +-- Test copy functions for logical replication slots +-- + +-- Create and copy logical slots +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Now we have maximum 4 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + +-- Test based on the temporary logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput'); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Cannot copy a logical slot to a physical slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); + +-- +-- Test copy functions for physical replication slots +-- + +-- Create and copy physical slots +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + +-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + +-- Cannot copy a physical slot to a logical slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- Cannot copy a physical slot that doesn't reserve WAL +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + +-- Test based on the temporary physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); + +-- Check all copied slots status +SELECT + o.slot_name, o.temporary, c.slot_name, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot2_no_change'); +SELECT pg_drop_replication_slot('copied_slot2_notemp'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 2aa1d1fc29..5c3724ab9e 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -20431,6 +20431,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); + + + + pg_copy_physical_replication_slot + + pg_copy_physical_replication_slot(src_slot_name name, dst_slot_name name , temporary boolean) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing physical replication slot name src_slot_name + to a physical replication slot named dst_slot_name. + The copied physical slot starts to reserve WAL from the same LSN as the + source slot. + temporary is optional. If temporary + is omitted, the same value as the source slot is used. + + + + + + + pg_copy_logical_replication_slot + + pg_copy_logical_replication_slot(src_slot_name name, dst_slot_name name , temporary boolean , plugin name) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing logical replication slot name src_slot_name + to a logical replication slot named dst_slot_name + while changing the output plugin and persistence. The copied logical slot starts + from the same LSN as the source logical slot. Both + temporary and plugin are optional. + If temporary or plugin are omitted, + the same values as the source logical slot are used. + + + diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6e5bc12e77..424fe86a1b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -211,11 +211,15 @@ StartupDecodingContext(List *output_plugin_options, /* * Create a new decoding context, for a new logical slot. * - * plugin contains the name of the output plugin - * output_plugin_options contains options passed to the output plugin - * read_page, prepare_write, do_write, update_progress - * callbacks that have to be filled to perform the use-case dependent, - * actual, work. + * plugin -- contains the name of the output plugin + * output_plugin_options -- contains options passed to the output plugin + * restart_lsn -- if given as invalid, it's this routine's responsibility to + * mark WAL as reserved by setting a convenient restart_lsn for the slot. + * Otherwise, we set for decoding to start from the given LSN without + * marking WAL reserved beforehand. In that scenario, it's up to the + * caller to guarantee that WAL remains available. + * read_page, prepare_write, do_write, update_progress -- + * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived * as the decoding context because further memory contexts will be created @@ -228,6 +232,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + if (XLogRecPtrIsInvalid(restart_lsn)) + ReplicationSlotReserveWal(); + else + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 224dd920c8..d7c53c54bd 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -10,13 +10,12 @@ * *------------------------------------------------------------------------- */ - #include "postgres.h" +#include "access/htup_details.h" +#include "access/xlog_internal.h" #include "funcapi.h" #include "miscadmin.h" - -#include "access/htup_details.h" #include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" @@ -35,6 +34,38 @@ check_permissions(void) (errmsg("must be superuser or replication role to use replication slots")))); } +/* + * Helper function for creating a new physical replication slot with + * given arguments. Note that this function doesn't release the created + * slot. + * + * If restart_lsn is a valid value, we use it without WAL reservation + * routine. So the caller must guarantee that WAL is available. + */ +static void +create_physical_replication_slot(char *name, bool immediately_reserve, + bool temporary, XLogRecPtr restart_lsn) +{ + Assert(!MyReplicationSlot); + + /* acquire replication slot, this will check for conflicting names */ + ReplicationSlotCreate(name, false, + temporary ? RS_TEMPORARY : RS_PERSISTENT); + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + if (XLogRecPtrIsInvalid(restart_lsn)) + ReplicationSlotReserveWal(); + else + MyReplicationSlot->data.restart_lsn = restart_lsn; + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } +} + /* * SQL function for creating a new physical (streaming replication) * replication slot. @@ -51,8 +82,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) HeapTuple tuple; Datum result; - Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); @@ -60,29 +89,21 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + create_physical_replication_slot(NameStr(*name), + immediately_reserve, + temporary, + InvalidXLogRecPtr); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; if (immediately_reserve) { - /* Reserve WAL as the user asked for it */ - ReplicationSlotReserveWal(); - - /* Write this slot to disk */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); nulls[1] = false; } else - { nulls[1] = true; - } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); @@ -94,32 +115,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) /* - * SQL function for creating a new logical replication slot. + * Helper function for creating a new logical replication slot with + * given arguments. Note that this function doesn't release the created + * slot. */ -Datum -pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +static void +create_logical_replication_slot(char *name, char *plugin, + bool temporary, XLogRecPtr restart_lsn) { - Name name = PG_GETARG_NAME(0); - Name plugin = PG_GETARG_NAME(1); - bool temporary = PG_GETARG_BOOL(2); - LogicalDecodingContext *ctx = NULL; - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; - Datum values[2]; - bool nulls[2]; - Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - check_permissions(); - - CheckLogicalDecodingRequirements(); - /* * Acquire a logical decoding slot, this will check for conflicting names. * Initially create persistent slot as ephemeral - that allows us to @@ -128,25 +135,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * slots can be created as temporary from beginning as they get dropped on * error as well. */ - ReplicationSlotCreate(NameStr(*name), true, + ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ + restart_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); - /* don't need the decoding context anymore */ FreeDecodingContext(ctx); +} + +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); + Datum result; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + check_permissions(); + + CheckLogicalDecodingRequirements(); + + create_logical_replication_slot(NameStr(*name), + NameStr(*plugin), + temporary, + InvalidXLogRecPtr); + + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); memset(nulls, 0, sizeof(nulls)); @@ -558,3 +594,235 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * Helper function of copying a replication slot. + */ +static Datum +copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + ReplicationSlot *src = NULL; + XLogRecPtr src_restart_lsn; + bool src_islogical; + bool temporary; + char *plugin; + Datum values[2]; + bool nulls[2]; + Datum result; + TupleDesc tupdesc; + HeapTuple tuple; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + check_permissions(); + + if (logical_slot) + CheckLogicalDecodingRequirements(); + else + CheckSlotRequirements(); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + /* + * We need to prevent the source slot's reserved WAL from being removed, + * but we don't want to lock that slot for very long, and it can advance + * in the meantime. So obtain the source slot's data, and create a new + * slot using its restart_lsn. Afterwards we lock the source slot again + * and verify that the data we copied (name, type) has not changed + * incompatibly. No inconvenient WAL removal can occur once the new slot + * is created -- but since WAL removal could have occurred before we + * managed to create the new slot, we advance the new slot's restart_lsn + * to the source slot's updated restart_lsn the second time we lock it. + */ + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) + { + SpinLockAcquire(&s->mutex); + src_islogical = SlotIsLogical(s); + src_restart_lsn = s->data.restart_lsn; + temporary = s->data.persistency == RS_TEMPORARY; + plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; + SpinLockRelease(&s->mutex); + + src = s; + break; + } + } + + LWLockRelease(ReplicationSlotControlLock); + + if (src == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", NameStr(*src_name)))); + + /* Check type of replication slot */ + if (src_islogical != logical_slot) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + src_islogical ? + errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot", + NameStr(*src_name)) : + errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot", + NameStr(*src_name)))); + + /* Copying non-reserved slot doesn't make sense */ + if (XLogRecPtrIsInvalid(src_restart_lsn)) + { + Assert(!logical_slot); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a replication slot that doesn't reserve WAL")))); + } + + /* Overwrite params from optional arguments */ + if (PG_NARGS() >= 3) + temporary = PG_GETARG_BOOL(2); + if (PG_NARGS() >= 4) + { + Assert(logical_slot); + plugin = NameStr(*(PG_GETARG_NAME(3))); + } + + /* Create new slot and acquire it */ + if (logical_slot) + create_logical_replication_slot(NameStr(*dst_name), + plugin, + temporary, + src_restart_lsn); + else + create_physical_replication_slot(NameStr(*dst_name), + true, + temporary, + src_restart_lsn); + + /* + * Update the destination slot to current values of the source slot; + * recheck that the source slot is still the one we saw previously. + */ + { + TransactionId copy_effective_xmin; + TransactionId copy_effective_catalog_xmin; + TransactionId copy_xmin; + TransactionId copy_catalog_xmin; + XLogRecPtr copy_restart_lsn; + bool copy_islogical; + char *copy_name; + + /* Copy data of source slot again */ + SpinLockAcquire(&src->mutex); + copy_effective_xmin = src->effective_xmin; + copy_effective_catalog_xmin = src->effective_catalog_xmin; + + copy_xmin = src->data.xmin; + copy_catalog_xmin = src->data.catalog_xmin; + copy_restart_lsn = src->data.restart_lsn; + + /* for existence check */ + copy_name = pstrdup(NameStr(src->data.name)); + copy_islogical = SlotIsLogical(src); + SpinLockRelease(&src->mutex); + + /* + * Check if the source slot still exists and is valid. We regards it + * as invalid if the type of replication slot or name has been + * changed, or the restart_lsn either is invalid or has gone backward. + * (The restart_lsn could go backwards if the source slot is dropped + * and copied from an older slot during installation.) + * + * Since erroring out will release and drop the destination slot we + * don't need to release it here. + */ + if (copy_restart_lsn < src_restart_lsn || + src_islogical != copy_islogical || + strcmp(copy_name, NameStr(*src_name)) != 0) + ereport(ERROR, + (errmsg("could not copy replication slot \"%s\"", + NameStr(*src_name)), + errdetail("The source replication slot was modified incompatibly during the copy operation."))); + + /* Install copied values again */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = copy_effective_xmin; + MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin; + + MyReplicationSlot->data.xmin = copy_xmin; + MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; + MyReplicationSlot->data.restart_lsn = copy_restart_lsn; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + +#ifdef USE_ASSERT_CHECKING + /* Check that the restart_lsn is available */ + { + XLogSegNo segno; + + XLByteToSeg(copy_restart_lsn, segno, wal_segment_size); + Assert(XLogGetLastRemovedSegno() < segno); + } +#endif + } + + /* target slot fully created, mark as persistent if needed */ + if (logical_slot && !temporary) + ReplicationSlotPersist(); + + /* All done. Set up the return values */ + values[0] = NameGetDatum(dst_name); + nulls[0] = false; + if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush)) + { + values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); + nulls[1] = false; + } + else + nulls[1] = true; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + ReplicationSlotRelease(); + + PG_RETURN_DATUM(result); +} + +/* The wrappers below are all to appease opr_sanity */ +Datum +pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, true); +} + +Datum +pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, false); +} + +Datum +pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS) +{ + return copy_replication_slot(fcinfo, false); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 21f5c868f1..aae6adc15c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -934,6 +934,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1fe54c2665..bfd2bfc186 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201904031 +#define CATALOG_VERSION_NO 201904051 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fb257c17c8..ad4519e001 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9774,6 +9774,20 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, +{ oid => '4220', descr => 'copy a physical replication slot, changing temporality', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_a' }, +{ oid => '4221', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_b' }, { oid => '3780', descr => 'drop a replication slot', proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', @@ -9794,6 +9808,27 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, +{ oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name', + proallargtypes => '{name,name,bool,name,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_a' }, +{ oid => '4223', descr => 'copy a logical replication slot, changing temporality', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_b' }, +{ oid => '4224', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_c' }, { oid => '3782', descr => 'get changes from replication slot', proname => 'pg_logical_slot_get_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c8ffc4c434..0a2a63a48c 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write,