Allow to enable failover property for replication slots via SQL API.

This commit adds the failover property to the replication slot. The
failover property indicates whether the slot will be synced to the standby
servers, enabling the resumption of corresponding logical replication
after failover. But note that this commit does not yet include the
capability to sync the replication slot; the subsequent commits will add
that capability.

A new optional parameter 'failover' is added to the
pg_create_logical_replication_slot() function. We will also enable to set
'failover' option for slots via the subscription commands in the
subsequent commits.

The value of the 'failover' flag is displayed as part of
pg_replication_slots view.

Author: Hou Zhijie, Shveta Malik, Ajin Cherian
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
This commit is contained in:
Amit Kapila 2024-01-25 12:15:46 +05:30
parent 86232a49a4
commit c393308b69
16 changed files with 141 additions and 25 deletions

View File

@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
(1 row) (1 row)
-- Test failover option of slots.
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
?column?
----------
init
(1 row)
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
?column?
----------
init
(1 row)
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
?column?
----------
init
(1 row)
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
?column?
----------
init
(1 row)
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
slot_name | slot_type | failover
-----------------------+-----------+----------
failover_true_slot | logical | t
failover_false_slot | logical | f
failover_default_slot | logical | f
physical_slot | physical | f
(4 rows)
SELECT pg_drop_replication_slot('failover_true_slot');
pg_drop_replication_slot
--------------------------
(1 row)
SELECT pg_drop_replication_slot('failover_false_slot');
pg_drop_replication_slot
--------------------------
(1 row)
SELECT pg_drop_replication_slot('failover_default_slot');
pg_drop_replication_slot
--------------------------
(1 row)
SELECT pg_drop_replication_slot('physical_slot');
pg_drop_replication_slot
--------------------------
(1 row)

View File

@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name;
SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('orig_slot2');
SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_no_change');
SELECT pg_drop_replication_slot('copied_slot2_notemp'); SELECT pg_drop_replication_slot('copied_slot2_notemp');
-- Test failover option of slots.
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
SELECT pg_drop_replication_slot('failover_true_slot');
SELECT pg_drop_replication_slot('failover_false_slot');
SELECT pg_drop_replication_slot('failover_default_slot');
SELECT pg_drop_replication_slot('physical_slot');

View File

@ -27707,7 +27707,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm> <indexterm>
<primary>pg_create_logical_replication_slot</primary> <primary>pg_create_logical_replication_slot</primary>
</indexterm> </indexterm>
<function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type> </optional> ) <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type> </optional> )
<returnvalue>record</returnvalue> <returnvalue>record</returnvalue>
( <parameter>slot_name</parameter> <type>name</type>, ( <parameter>slot_name</parameter> <type>name</type>,
<parameter>lsn</parameter> <type>pg_lsn</type> ) <parameter>lsn</parameter> <type>pg_lsn</type> )
@ -27722,8 +27722,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
released upon any error. The optional fourth parameter, released upon any error. The optional fourth parameter,
<parameter>twophase</parameter>, when set to true, specifies <parameter>twophase</parameter>, when set to true, specifies
that the decoding of prepared transactions is enabled for this that the decoding of prepared transactions is enabled for this
slot. A call to this function has the same effect as the replication slot. The optional fifth parameter,
protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>. <parameter>failover</parameter>, when set to true,
specifies that this slot is enabled to be synced to the
standbys so that logical replication can be resumed after
failover. A call to this function has the same effect as
the replication protocol command
<literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
</para></entry> </para></entry>
</row> </row>

View File

@ -2555,6 +2555,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</itemizedlist> </itemizedlist>
</para></entry> </para></entry>
</row> </row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>failover</structfield> <type>bool</type>
</para>
<para>
True if this is a logical slot enabled to be synced to the standbys.
Always false for physical slots.
</para></entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>

View File

@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN slot_name name, IN plugin name, IN slot_name name, IN plugin name,
IN temporary boolean DEFAULT false, IN temporary boolean DEFAULT false,
IN twophase boolean DEFAULT false, IN twophase boolean DEFAULT false,
IN failover boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn) OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD RETURNS RECORD
LANGUAGE INTERNAL LANGUAGE INTERNAL

View File

@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
L.wal_status, L.wal_status,
L.safe_wal_size, L.safe_wal_size,
L.two_phase, L.two_phase,
L.conflict_reason L.conflict_reason,
L.failover
FROM pg_get_replication_slots() AS L FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid); LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */ #define SLOT_MAGIC 0x1051CA1 /* format identifier */
#define SLOT_VERSION 3 /* version for new files */ #define SLOT_VERSION 4 /* version for new files */
/* Control array for replication slot management */ /* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL; ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel)
* during getting changes, if the two_phase option is enabled it can skip * during getting changes, if the two_phase option is enabled it can skip
* prepare because by that time start decoding point has been moved. So the * prepare because by that time start decoding point has been moved. So the
* user will only get commit prepared. * user will only get commit prepared.
* failover: If enabled, allows the slot to be synced to standbys so
* that logical replication can be resumed after failover.
*/ */
void void
ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency, bool two_phase) ReplicationSlotPersistency persistency,
bool two_phase, bool failover)
{ {
ReplicationSlot *slot = NULL; ReplicationSlot *slot = NULL;
int i; int i;
@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.persistency = persistency; slot->data.persistency = persistency;
slot->data.two_phase = two_phase; slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.two_phase_at = InvalidXLogRecPtr;
slot->data.failover = failover;
/* and then data only present in shared memory */ /* and then data only present in shared memory */
slot->just_dirtied = false; slot->just_dirtied = false;

View File

@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */ /* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false, ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT, false); temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
false);
if (immediately_reserve) if (immediately_reserve)
{ {
@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
static void static void
create_logical_replication_slot(char *name, char *plugin, create_logical_replication_slot(char *name, char *plugin,
bool temporary, bool two_phase, bool temporary, bool two_phase,
bool failover,
XLogRecPtr restart_lsn, XLogRecPtr restart_lsn,
bool find_startpoint) bool find_startpoint)
{ {
@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
* error as well. * error as well.
*/ */
ReplicationSlotCreate(name, true, ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
failover);
/* /*
* Create logical decoding context to find start point or, if we don't * Create logical decoding context to find start point or, if we don't
@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name plugin = PG_GETARG_NAME(1); Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2); bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3); bool two_phase = PG_GETARG_BOOL(3);
bool failover = PG_GETARG_BOOL(4);
Datum result; Datum result;
TupleDesc tupdesc; TupleDesc tupdesc;
HeapTuple tuple; HeapTuple tuple;
@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
NameStr(*plugin), NameStr(*plugin),
temporary, temporary,
two_phase, two_phase,
failover,
InvalidXLogRecPtr, InvalidXLogRecPtr,
true); true);
@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum Datum
pg_get_replication_slots(PG_FUNCTION_ARGS) pg_get_replication_slots(PG_FUNCTION_ARGS)
{ {
#define PG_GET_REPLICATION_SLOTS_COLS 15 #define PG_GET_REPLICATION_SLOTS_COLS 16
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn; XLogRecPtr currlsn;
int slotno; int slotno;
@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
} }
} }
values[i++] = BoolGetDatum(slot_contents.data.failover);
Assert(i == PG_GET_REPLICATION_SLOTS_COLS); Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
XLogRecPtr src_restart_lsn; XLogRecPtr src_restart_lsn;
bool src_islogical; bool src_islogical;
bool temporary; bool temporary;
bool failover;
char *plugin; char *plugin;
Datum values[2]; Datum values[2];
bool nulls[2]; bool nulls[2];
@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
src_islogical = SlotIsLogical(&first_slot_contents); src_islogical = SlotIsLogical(&first_slot_contents);
src_restart_lsn = first_slot_contents.data.restart_lsn; src_restart_lsn = first_slot_contents.data.restart_lsn;
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
failover = first_slot_contents.data.failover;
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
/* Check type of replication slot */ /* Check type of replication slot */
@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
plugin, plugin,
temporary, temporary,
false, false,
failover,
src_restart_lsn, src_restart_lsn,
false); false);
} }

View File

@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{ {
ReplicationSlotCreate(cmd->slotname, false, ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
false); false, false);
if (reserve_wal) if (reserve_wal)
{ {
@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/ */
ReplicationSlotCreate(cmd->slotname, true, ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
two_phase); two_phase, false);
/* /*
* Do options check early so that we can bail before calling the * Do options check early so that we can bail before calling the

View File

@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
* started and stopped several times causing any temporary slots to be * started and stopped several times causing any temporary slots to be
* removed. * removed.
*/ */
res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
"%s as caught_up, conflict_reason IS NOT NULL as invalid " "%s as caught_up, conflict_reason IS NOT NULL as invalid "
"FROM pg_catalog.pg_replication_slots " "FROM pg_catalog.pg_replication_slots "
"WHERE slot_type = 'logical' AND " "WHERE slot_type = 'logical' AND "
@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
int i_slotname; int i_slotname;
int i_plugin; int i_plugin;
int i_twophase; int i_twophase;
int i_failover;
int i_caught_up; int i_caught_up;
int i_invalid; int i_invalid;
@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
i_slotname = PQfnumber(res, "slot_name"); i_slotname = PQfnumber(res, "slot_name");
i_plugin = PQfnumber(res, "plugin"); i_plugin = PQfnumber(res, "plugin");
i_twophase = PQfnumber(res, "two_phase"); i_twophase = PQfnumber(res, "two_phase");
i_failover = PQfnumber(res, "failover");
i_caught_up = PQfnumber(res, "caught_up"); i_caught_up = PQfnumber(res, "caught_up");
i_invalid = PQfnumber(res, "invalid"); i_invalid = PQfnumber(res, "invalid");
@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname)); curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
} }

View File

@ -916,8 +916,10 @@ create_logical_replication_slots(void)
appendStringLiteralConn(query, slot_info->slotname, conn); appendStringLiteralConn(query, slot_info->slotname, conn);
appendPQExpBuffer(query, ", "); appendPQExpBuffer(query, ", ");
appendStringLiteralConn(query, slot_info->plugin, conn); appendStringLiteralConn(query, slot_info->plugin, conn);
appendPQExpBuffer(query, ", false, %s);",
slot_info->two_phase ? "true" : "false"); appendPQExpBuffer(query, ", false, %s, %s);",
slot_info->two_phase ? "true" : "false",
slot_info->failover ? "true" : "false");
PQclear(executeQueryOrDie(conn, "%s", query->data)); PQclear(executeQueryOrDie(conn, "%s", query->data));

View File

@ -160,6 +160,8 @@ typedef struct
bool two_phase; /* can the slot decode 2PC? */ bool two_phase; /* can the slot decode 2PC? */
bool caught_up; /* has the slot caught up to latest changes? */ bool caught_up; /* has the slot caught up to latest changes? */
bool invalid; /* if true, the slot is unusable */ bool invalid; /* if true, the slot is unusable */
bool failover; /* is the slot designated to be synced to the
* physical standby? */
} LogicalSlotInfo; } LogicalSlotInfo;
typedef struct typedef struct

View File

@ -57,6 +57,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 202401251 #define CATALOG_VERSION_NO 202401252
#endif #endif

View File

@ -11127,17 +11127,17 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record', proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '', proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}', proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}', proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
prosrc => 'pg_get_replication_slots' }, prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot', { oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v', proname => 'pg_create_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record', proparallel => 'u', prorettype => 'record',
proargtypes => 'name name bool bool', proargtypes => 'name name bool bool bool',
proallargtypes => '{name,name,bool,bool,name,pg_lsn}', proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
proargmodes => '{i,i,i,i,o,o}', proargmodes => '{i,i,i,i,i,o,o}',
proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' }, prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222', { oid => '4222',
descr => 'copy a logical replication slot, changing temporality and plugin', descr => 'copy a logical replication slot, changing temporality and plugin',

View File

@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData
/* plugin name */ /* plugin name */
NameData plugin; NameData plugin;
/*
* Is this a failover slot (sync candidate for standbys)? Only relevant
* for logical slots on the primary server.
*/
bool failover;
} ReplicationSlotPersistentData; } ReplicationSlotPersistentData;
/* /*
@ -218,7 +224,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */ /* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency, ReplicationSlotPersistency persistency,
bool two_phase); bool two_phase, bool failover);
extern void ReplicationSlotPersist(void); extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDrop(const char *name, bool nowait);

View File

@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name,
l.wal_status, l.wal_status,
l.safe_wal_size, l.safe_wal_size,
l.two_phase, l.two_phase,
l.conflict_reason l.conflict_reason,
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason) l.failover
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
LEFT JOIN pg_database d ON ((l.datoid = d.oid))); LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname, pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper, pg_authid.rolsuper,