diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b95cc88599..132436c6e6 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2067,6 +2067,54 @@ The commands accepted in replication mode are: + + READ_REPLICATION_SLOT slot_name + READ_REPLICATION_SLOT + + + + Read some information associated to a replication slot. Returns a tuple + with NULL values if the replication slot does not + exist. This command is currently only supported for physical replication + slots. + + + In response to this command, the server will return a one-row result set, + containing the following fields: + + + slot_type (text) + + + The replication slot's type, either physical or + NULL. + + + + + + restart_lsn (text) + + + The replication slot's restart_lsn. + + + + + + restart_tli (int8) + + + The timeline ID associated to restart_lsn, + following the current timeline history. + + + + + + + + START_REPLICATION [ SLOT slot_name ] [ PHYSICAL ] XXX/XXX [ TIMELINE tli ] START_REPLICATION diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 126380e2df..dcb1108579 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void); /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_READ_REPLICATION_SLOT %token K_SHOW %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT @@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void); %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd %type base_backup_legacy_opt_list generic_option_list %type base_backup_legacy_opt generic_option %type opt_timeline @@ -125,6 +126,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | read_replication_slot | timeline_history | show | sql_cmd @@ -140,6 +142,18 @@ identify_system: } ; +/* + * READ_REPLICATION_SLOT %s + */ +read_replication_slot: + K_READ_REPLICATION_SLOT var_name + { + ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd); + n->slotname = $2; + $$ = (Node *) n; + } + ; + /* * SHOW setting */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index c038a636c3..1b599c255e 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b811a5c0ef..d8f5f113c0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -232,6 +232,7 @@ static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); @@ -457,6 +458,104 @@ IdentifySystem(void) end_tup_output(tstate); } +/* Handle READ_REPLICATION_SLOT command */ +static void +ReadReplicationSlot(ReadReplicationSlotCmd *cmd) +{ +#define READ_REPLICATION_SLOT_COLS 3 + ReplicationSlot *slot; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[READ_REPLICATION_SLOT_COLS]; + bool nulls[READ_REPLICATION_SLOT_COLS]; + + tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn", + TEXTOID, -1, 0); + /* TimeLineID is unsigned, so int4 is not wide enough. */ + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli", + INT8OID, -1, 0); + + MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum)); + MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(cmd->slotname, false); + if (slot == NULL || !slot->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + } + else + { + ReplicationSlot slot_contents; + int i = 0; + + /* Copy slot contents while holding spinlock */ + SpinLockAcquire(&slot->mutex); + slot_contents = *slot; + SpinLockRelease(&slot->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (OidIsValid(slot_contents.data.database)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use \"%s\" with logical replication slot \"%s\"", + "READ_REPLICATION_SLOT", + NameStr(slot_contents.data.name))); + + /* slot type */ + values[i] = CStringGetTextDatum("physical"); + nulls[i] = false; + i++; + + /* start LSN */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + char xloc[64]; + + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.restart_lsn)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + /* timeline this WAL was produced on */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + TimeLineID slots_position_timeline; + TimeLineID current_timeline; + List *timeline_history = NIL; + + /* + * While in recovery, use as timeline the currently-replaying one + * to get the LSN position's history. + */ + if (RecoveryInProgress()) + (void) GetXLogReplayRecPtr(¤t_timeline); + else + current_timeline = ThisTimeLineID; + + timeline_history = readTimeLineHistory(current_timeline); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, + timeline_history); + values[i] = Int64GetDatum((int64) slots_position_timeline); + nulls[i] = false; + } + i++; + + Assert(i == READ_REPLICATION_SLOT_COLS); + } + + dest = CreateDestReceiver(DestRemoteSimple); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); +} + /* * Handle TIMELINE_HISTORY command. @@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ReadReplicationSlotCmd: + cmdtag = "READ_REPLICATION_SLOT"; + set_ps_display(cmdtag); + ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_BaseBackupCmd: cmdtag = "BASE_BACKUP"; set_ps_display(cmdtag); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index e0057daa06..541e9861ba 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -496,6 +496,7 @@ typedef enum NodeTag T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, + T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_SQLCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..a746fafc12 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -87,6 +87,17 @@ typedef struct StartReplicationCmd } StartReplicationCmd; +/* ---------------------- + * READ_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct ReadReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} ReadReplicationSlotCmd; + + /* ---------------------- * TIMELINE_HISTORY command * ---------------------- diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index bc62ec66bc..b3dc68d54b 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; -use Test::More tests => 49; +use Test::More tests => 53; # Initialize primary node my $node_primary = PostgreSQL::Test::Cluster->new('primary'); @@ -254,6 +254,36 @@ ok( $ret == 0, "SHOW with superuser-settable parameter, replication role and logical replication" ); +note "testing READ_REPLICATION_SLOT command for replication connection"; + +my $slotname = 'test_read_replication_slot_physical'; + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + 'READ_REPLICATION_SLOT non_existent_slot;', + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success"); +like($stdout, qr/^||$/, + "READ_REPLICATION_SLOT returns NULL values if slot does not exist"); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;", + extra_params => [ '-d', $connstr_rep ]); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot"); +like($stdout, qr/^physical\|[^|]*\|1$/, + "READ_REPLICATION_SLOT returns tuple with slot information"); + +$node_primary->psql( + 'postgres', + "DROP_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); + note "switching to physical replication slot"; # Switch to using a physical replication slot. We can do this without a new diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 1655298bf5..46ea5a3866 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -10,7 +10,7 @@ use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; -use Test::More tests => 14; +use Test::More tests => 15; use Config; # Initialize primary node @@ -39,6 +39,15 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +($result, $stdout, $stderr) = $node_primary->psql( + 'template1', + qq[READ_REPLICATION_SLOT test_slot;], + replication => 'database'); +like( + $stderr, + qr/cannot use "READ_REPLICATION_SLOT" with logical replication slot "test_slot"/, + 'READ_REPLICATION_SLOT not supported for logical slots'); + # Check case of walsender not using a database connection. Logical # decoding should not be allowed. ($result, $stdout, $stderr) = $node_primary->psql( diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1c7bac0578..40fbcddd20 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2129,6 +2129,7 @@ ReadBufferMode ReadBytePtrType ReadExtraTocPtrType ReadFunc +ReadReplicationSlotCmd ReassignOwnedStmt RecheckForeignScan_function RecordCacheEntry