Flexible options for CREATE_REPLICATION_SLOT.

Like BASE_BACKUP, CREATE_REPLICATION_SLOT has historically used a
hard-coded syntax.  To improve future extensibility, adopt a flexible
options syntax here, too.

In the new syntax, instead of three mutually exclusive options
EXPORT_SNAPSHOT, USE_SNAPSHOT, and NOEXPORT_SNAPSHOT, there is now a single
SNAPSHOT option with three possible values: 'export', 'use', and 'nothing'.

This commit does not remove support for the old syntax. It just adds
the new one as an additional option, makes pg_receivewal,
pg_recvlogical, and walreceiver processes use it.

Patch by me, reviewed by Fabien Coelho, Sergei Kornilov, and
Fujii Masao.

Discussion: http://postgr.es/m/CA+TgmobAczXDRO_Gr2euo_TxgzaH1JxbNxvFx=HYvBinefNH8Q@mail.gmail.com
Discussion: http://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
This commit is contained in:
Robert Haas 2021-10-05 12:52:49 -04:00
parent 0ba281cb4b
commit 0266e98c6b
5 changed files with 152 additions and 65 deletions

View File

@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
</varlistentry> </varlistentry>
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT"> <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] } <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> | <literal>LOGICAL</literal> } [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm> <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term> </term>
<listitem> <listitem>
@ -1954,46 +1954,50 @@ The commands accepted in replication mode are:
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist>
<para>The following options are supported:</para>
<variablelist>
<varlistentry> <varlistentry>
<term><literal>TWO_PHASE</literal></term> <term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem> <listitem>
<para> <para>
Specify that this logical replication slot supports decoding of two-phase If true, this logical replication slot supports decoding of two-phase
transactions. With this option, two-phase commands like transactions. With this option, two-phase commands like
<literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal> <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted. and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
The transaction will be decoded and transmitted at The transaction will be decoded and transmitted at
<literal>PREPARE TRANSACTION</literal> time. <literal>PREPARE TRANSACTION</literal> time.
The default is false.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><literal>RESERVE_WAL</literal></term> <term><literal>RESERVE_WAL [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem> <listitem>
<para> <para>
Specify that this physical replication slot reserves <acronym>WAL</acronym> If true, this physical replication slot reserves <acronym>WAL</acronym>
immediately. Otherwise, <acronym>WAL</acronym> is only reserved upon immediately. Otherwise, <acronym>WAL</acronym> is only reserved upon
connection from a streaming replication client. connection from a streaming replication client.
The default is false.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><literal>EXPORT_SNAPSHOT</literal></term> <term><literal>SNAPSHOT { 'export' | 'use' | 'nothing' }</literal></term>
<term><literal>NOEXPORT_SNAPSHOT</literal></term>
<term><literal>USE_SNAPSHOT</literal></term>
<listitem> <listitem>
<para> <para>
Decides what to do with the snapshot created during logical slot Decides what to do with the snapshot created during logical slot
initialization. <literal>EXPORT_SNAPSHOT</literal>, which is the default, initialization. <literal>'export'</literal>, which is the default,
will export the snapshot for use in other sessions. This option can't will export the snapshot for use in other sessions. This option can't
be used inside a transaction. <literal>USE_SNAPSHOT</literal> will use the be used inside a transaction. <literal>'use'</literal> will use the
snapshot for the current transaction executing the command. This snapshot for the current transaction executing the command. This
option must be used in a transaction, and option must be used in a transaction, and
<literal>CREATE_REPLICATION_SLOT</literal> must be the first command <literal>CREATE_REPLICATION_SLOT</literal> must be the first command
run in that transaction. Finally, <literal>NOEXPORT_SNAPSHOT</literal> will run in that transaction. Finally, <literal>'nothing'</literal> will
just use the snapshot for logical decoding as normal but won't do just use the snapshot for logical decoding as normal but won't do
anything else with it. anything else with it.
</para> </para>
@ -2052,6 +2056,17 @@ The commands accepted in replication mode are:
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
</term>
<listitem>
<para>
For compatibility with older releases, this alternative syntax for
the <literal>CREATE_REPLICATION_SLOT</literal> command is still supported.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ] <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
<indexterm><primary>START_REPLICATION</primary></indexterm> <indexterm><primary>START_REPLICATION</primary></indexterm>

View File

@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
PGresult *res; PGresult *res;
StringInfoData cmd; StringInfoData cmd;
char *snapshot; char *snapshot;
int use_new_options_syntax;
use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
initStringInfo(&cmd); initStringInfo(&cmd);
@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
if (conn->logical) if (conn->logical)
{ {
appendStringInfoString(&cmd, " LOGICAL pgoutput"); appendStringInfoString(&cmd, " LOGICAL pgoutput ");
if (use_new_options_syntax)
appendStringInfoChar(&cmd, '(');
if (two_phase) if (two_phase)
appendStringInfoString(&cmd, " TWO_PHASE");
switch (snapshot_action)
{ {
case CRS_EXPORT_SNAPSHOT: appendStringInfoString(&cmd, "TWO_PHASE");
appendStringInfoString(&cmd, " EXPORT_SNAPSHOT"); if (use_new_options_syntax)
break; appendStringInfoString(&cmd, ", ");
case CRS_NOEXPORT_SNAPSHOT: else
appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT"); appendStringInfoChar(&cmd, ' ');
break;
case CRS_USE_SNAPSHOT:
appendStringInfoString(&cmd, " USE_SNAPSHOT");
break;
} }
if (use_new_options_syntax)
{
switch (snapshot_action)
{
case CRS_EXPORT_SNAPSHOT:
appendStringInfoString(&cmd, "SNAPSHOT 'export'");
break;
case CRS_NOEXPORT_SNAPSHOT:
appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
break;
case CRS_USE_SNAPSHOT:
appendStringInfoString(&cmd, "SNAPSHOT 'use'");
break;
}
}
else
{
switch (snapshot_action)
{
case CRS_EXPORT_SNAPSHOT:
appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
break;
case CRS_NOEXPORT_SNAPSHOT:
appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
break;
case CRS_USE_SNAPSHOT:
appendStringInfoString(&cmd, "USE_SNAPSHOT");
break;
}
}
if (use_new_options_syntax)
appendStringInfoChar(&cmd, ')');
} }
else else
{ {
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); if (use_new_options_syntax)
appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
else
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
} }
res = libpqrcv_PQexec(conn->streamConn, cmd.data); res = libpqrcv_PQexec(conn->streamConn, cmd.data);

View File

@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void);
%type <node> plugin_opt_arg %type <node> plugin_opt_arg
%type <str> opt_slot var_name ident_or_keyword %type <str> opt_slot var_name ident_or_keyword
%type <boolval> opt_temporary %type <boolval> opt_temporary
%type <list> create_slot_opt_list %type <list> create_slot_options create_slot_legacy_opt_list
%type <defelt> create_slot_opt %type <defelt> create_slot_legacy_opt
%% %%
@ -243,8 +243,8 @@ base_backup_legacy_opt:
; ;
create_replication_slot: create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options
{ {
CreateReplicationSlotCmd *cmd; CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd); cmd = makeNode(CreateReplicationSlotCmd);
@ -254,8 +254,8 @@ create_replication_slot:
cmd->options = $5; cmd->options = $5;
$$ = (Node *) cmd; $$ = (Node *) cmd;
} }
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */
| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options
{ {
CreateReplicationSlotCmd *cmd; CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd); cmd = makeNode(CreateReplicationSlotCmd);
@ -268,28 +268,33 @@ create_replication_slot:
} }
; ;
create_slot_opt_list: create_slot_options:
create_slot_opt_list create_slot_opt '(' generic_option_list ')' { $$ = $2; }
| create_slot_legacy_opt_list { $$ = $1; }
;
create_slot_legacy_opt_list:
create_slot_legacy_opt_list create_slot_legacy_opt
{ $$ = lappend($1, $2); } { $$ = lappend($1, $2); }
| /* EMPTY */ | /* EMPTY */
{ $$ = NIL; } { $$ = NIL; }
; ;
create_slot_opt: create_slot_legacy_opt:
K_EXPORT_SNAPSHOT K_EXPORT_SNAPSHOT
{ {
$$ = makeDefElem("export_snapshot", $$ = makeDefElem("snapshot",
(Node *)makeInteger(true), -1); (Node *)makeString("export"), -1);
} }
| K_NOEXPORT_SNAPSHOT | K_NOEXPORT_SNAPSHOT
{ {
$$ = makeDefElem("export_snapshot", $$ = makeDefElem("snapshot",
(Node *)makeInteger(false), -1); (Node *)makeString("nothing"), -1);
} }
| K_USE_SNAPSHOT | K_USE_SNAPSHOT
{ {
$$ = makeDefElem("use_snapshot", $$ = makeDefElem("snapshot",
(Node *)makeInteger(true), -1); (Node *)makeString("use"), -1);
} }
| K_RESERVE_WAL | K_RESERVE_WAL
{ {

View File

@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
{ {
DefElem *defel = (DefElem *) lfirst(lc); DefElem *defel = (DefElem *) lfirst(lc);
if (strcmp(defel->defname, "export_snapshot") == 0) if (strcmp(defel->defname, "snapshot") == 0)
{ {
char *action;
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
action = defGetString(defel);
snapshot_action_given = true; snapshot_action_given = true;
*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
CRS_NOEXPORT_SNAPSHOT;
}
else if (strcmp(defel->defname, "use_snapshot") == 0)
{
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
snapshot_action_given = true; if (strcmp(action, "export") == 0)
*snapshot_action = CRS_USE_SNAPSHOT; *snapshot_action = CRS_EXPORT_SNAPSHOT;
else if (strcmp(action, "nothing") == 0)
*snapshot_action = CRS_NOEXPORT_SNAPSHOT;
else if (strcmp(action, "use") == 0)
*snapshot_action = CRS_USE_SNAPSHOT;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
defel->defname, action)));
} }
else if (strcmp(defel->defname, "reserve_wal") == 0) else if (strcmp(defel->defname, "reserve_wal") == 0)
{ {
@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
reserve_wal_given = true; reserve_wal_given = true;
*reserve_wal = true; *reserve_wal = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "two_phase") == 0) else if (strcmp(defel->defname, "two_phase") == 0)
{ {
@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
two_phase_given = true; two_phase_given = true;
*two_phase = true; *two_phase = defGetBoolean(defel);
} }
else else
elog(ERROR, "unrecognized option: %s", defel->defname); elog(ERROR, "unrecognized option: %s", defel->defname);
@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ereport(ERROR, ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */ /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must not be called inside a transaction", (errmsg("%s must not be called inside a transaction",
"CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
need_full_snapshot = true; need_full_snapshot = true;
} }
@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ereport(ERROR, ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */ /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called inside a transaction", (errmsg("%s must be called inside a transaction",
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (XactIsoLevel != XACT_REPEATABLE_READ) if (XactIsoLevel != XACT_REPEATABLE_READ)
ereport(ERROR, ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */ /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called in REPEATABLE READ isolation mode transaction", (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (FirstSnapshotSet) if (FirstSnapshotSet)
ereport(ERROR, ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */ /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must be called before any query", (errmsg("%s must be called before any query",
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
if (IsSubTransaction()) if (IsSubTransaction())
ereport(ERROR, ereport(ERROR,
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */ /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
(errmsg("%s must not be called in a subtransaction", (errmsg("%s must not be called in a subtransaction",
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
need_full_snapshot = true; need_full_snapshot = true;
} }

View File

@ -490,6 +490,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
{ {
PQExpBuffer query; PQExpBuffer query;
PGresult *res; PGresult *res;
bool use_new_option_syntax = (PQserverVersion(conn) >= 150000);
query = createPQExpBuffer(); query = createPQExpBuffer();
@ -498,27 +499,54 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
Assert(!(two_phase && is_physical)); Assert(!(two_phase && is_physical));
Assert(slot_name != NULL); Assert(slot_name != NULL);
/* Build query */ /* Build base portion of query */
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
if (is_temporary) if (is_temporary)
appendPQExpBufferStr(query, " TEMPORARY"); appendPQExpBufferStr(query, " TEMPORARY");
if (is_physical) if (is_physical)
{
appendPQExpBufferStr(query, " PHYSICAL"); appendPQExpBufferStr(query, " PHYSICAL");
else
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
/* Add any requested options */
if (use_new_option_syntax)
appendPQExpBufferStr(query, " (");
if (is_physical)
{
if (reserve_wal) if (reserve_wal)
appendPQExpBufferStr(query, " RESERVE_WAL"); AppendPlainCommandOption(query, use_new_option_syntax,
"RESERVE_WAL");
} }
else else
{ {
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
if (two_phase && PQserverVersion(conn) >= 150000) if (two_phase && PQserverVersion(conn) >= 150000)
appendPQExpBufferStr(query, " TWO_PHASE"); AppendPlainCommandOption(query, use_new_option_syntax,
"TWO_PHASE");
if (PQserverVersion(conn) >= 100000) if (PQserverVersion(conn) >= 100000)
{
/* pg_recvlogical doesn't use an exported snapshot, so suppress */ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT"); if (use_new_option_syntax)
AppendStringCommandOption(query, use_new_option_syntax,
"SNAPSHOT", "nothing");
else
AppendPlainCommandOption(query, use_new_option_syntax,
"NOEXPORT_SNAPSHOT");
}
}
if (use_new_option_syntax)
{
/* Suppress option list if it would be empty, otherwise terminate */
if (query->data[query->len - 1] == '(')
{
query->len -= 2;
query->data[query->len] = '\0';
}
else
appendPQExpBufferChar(query, ')');
} }
/* Now run the query */
res = PQexec(conn, query->data); res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {