From 0266e98c6b865246c3031bbf55cb15f330134e30 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 5 Oct 2021 12:52:49 -0400 Subject: [PATCH] Flexible options for CREATE_REPLICATION_SLOT. Like BASE_BACKUP, CREATE_REPLICATION_SLOT has historically used a hard-coded syntax. To improve future extensibility, adopt a flexible options syntax here, too. In the new syntax, instead of three mutually exclusive options EXPORT_SNAPSHOT, USE_SNAPSHOT, and NOEXPORT_SNAPSHOT, there is now a single SNAPSHOT option with three possible values: 'export', 'use', and 'nothing'. This commit does not remove support for the old syntax. It just adds the new one as an additional option, makes pg_receivewal, pg_recvlogical, and walreceiver processes use it. Patch by me, reviewed by Fabien Coelho, Sergei Kornilov, and Fujii Masao. Discussion: http://postgr.es/m/CA+TgmobAczXDRO_Gr2euo_TxgzaH1JxbNxvFx=HYvBinefNH8Q@mail.gmail.com Discussion: http://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com --- doc/src/sgml/protocol.sgml | 37 +++++++---- .../libpqwalreceiver/libpqwalreceiver.c | 63 ++++++++++++++----- src/backend/replication/repl_gram.y | 35 ++++++----- src/backend/replication/walsender.c | 42 +++++++------ src/bin/pg_basebackup/streamutil.c | 40 ++++++++++-- 5 files changed, 152 insertions(+), 65 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8ed8833444..b95cc88599 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1914,7 +1914,7 @@ The commands accepted in replication mode are: - CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT | TWO_PHASE ] } + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL | LOGICAL } [ ( option [, ...] ) ] CREATE_REPLICATION_SLOT @@ -1954,46 +1954,50 @@ The commands accepted in replication mode are: + + The following options are supported: + + - TWO_PHASE + TWO_PHASE [ boolean ] - Specify that this logical replication slot supports decoding of two-phase + If true, this logical replication slot supports decoding of two-phase transactions. With this option, two-phase commands like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED are decoded and transmitted. The transaction will be decoded and transmitted at PREPARE TRANSACTION time. + The default is false. - RESERVE_WAL + RESERVE_WAL [ boolean ] - Specify that this physical replication slot reserves WAL + If true, this physical replication slot reserves WAL immediately. Otherwise, WAL is only reserved upon connection from a streaming replication client. + The default is false. - EXPORT_SNAPSHOT - NOEXPORT_SNAPSHOT - USE_SNAPSHOT + SNAPSHOT { 'export' | 'use' | 'nothing' } Decides what to do with the snapshot created during logical slot - initialization. EXPORT_SNAPSHOT, which is the default, + initialization. 'export', which is the default, will export the snapshot for use in other sessions. This option can't - be used inside a transaction. USE_SNAPSHOT will use the + be used inside a transaction. 'use' will use the snapshot for the current transaction executing the command. This option must be used in a transaction, and CREATE_REPLICATION_SLOT must be the first command - run in that transaction. Finally, NOEXPORT_SNAPSHOT will + run in that transaction. Finally, 'nothing' will just use the snapshot for logical decoding as normal but won't do anything else with it. @@ -2052,6 +2056,17 @@ The commands accepted in replication mode are: + + CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT | TWO_PHASE ] } + + + + For compatibility with older releases, this alternative syntax for + the CREATE_REPLICATION_SLOT command is still supported. + + + + START_REPLICATION [ SLOT slot_name ] [ PHYSICAL ] XXX/XXX [ TIMELINE tli ] START_REPLICATION diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 19ea159af4..5c6e56a5b2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, PGresult *res; StringInfoData cmd; char *snapshot; + int use_new_options_syntax; + + use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000); initStringInfo(&cmd); @@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, if (conn->logical) { - appendStringInfoString(&cmd, " LOGICAL pgoutput"); + appendStringInfoString(&cmd, " LOGICAL pgoutput "); + if (use_new_options_syntax) + appendStringInfoChar(&cmd, '('); if (two_phase) - appendStringInfoString(&cmd, " TWO_PHASE"); - - switch (snapshot_action) { - case CRS_EXPORT_SNAPSHOT: - appendStringInfoString(&cmd, " EXPORT_SNAPSHOT"); - break; - case CRS_NOEXPORT_SNAPSHOT: - appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT"); - break; - case CRS_USE_SNAPSHOT: - appendStringInfoString(&cmd, " USE_SNAPSHOT"); - break; + appendStringInfoString(&cmd, "TWO_PHASE"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); } + + if (use_new_options_syntax) + { + switch (snapshot_action) + { + case CRS_EXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'export'"); + break; + case CRS_NOEXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'nothing'"); + break; + case CRS_USE_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'use'"); + break; + } + } + else + { + switch (snapshot_action) + { + case CRS_EXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "EXPORT_SNAPSHOT"); + break; + case CRS_NOEXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT"); + break; + case CRS_USE_SNAPSHOT: + appendStringInfoString(&cmd, "USE_SNAPSHOT"); + break; + } + } + + if (use_new_options_syntax) + appendStringInfoChar(&cmd, ')'); } else { - appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)"); + else + appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } res = libpqrcv_PQexec(conn->streamConn, cmd.data); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 3b59d62ed8..126380e2df 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void); %type plugin_opt_arg %type opt_slot var_name ident_or_keyword %type opt_temporary -%type create_slot_opt_list -%type create_slot_opt +%type create_slot_options create_slot_legacy_opt_list +%type create_slot_legacy_opt %% @@ -243,8 +243,8 @@ base_backup_legacy_opt: ; create_replication_slot: - /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */ + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -254,8 +254,8 @@ create_replication_slot: cmd->options = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -268,28 +268,33 @@ create_replication_slot: } ; -create_slot_opt_list: - create_slot_opt_list create_slot_opt +create_slot_options: + '(' generic_option_list ')' { $$ = $2; } + | create_slot_legacy_opt_list { $$ = $1; } + ; + +create_slot_legacy_opt_list: + create_slot_legacy_opt_list create_slot_legacy_opt { $$ = lappend($1, $2); } | /* EMPTY */ { $$ = NIL; } ; -create_slot_opt: +create_slot_legacy_opt: K_EXPORT_SNAPSHOT { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(true), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("export"), -1); } | K_NOEXPORT_SNAPSHOT { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(false), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("nothing"), -1); } | K_USE_SNAPSHOT { - $$ = makeDefElem("use_snapshot", - (Node *)makeInteger(true), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("use"), -1); } | K_RESERVE_WAL { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..b811a5c0ef 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "export_snapshot") == 0) + if (strcmp(defel->defname, "snapshot") == 0) { + char *action; + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + action = defGetString(defel); snapshot_action_given = true; - *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT : - CRS_NOEXPORT_SNAPSHOT; - } - else if (strcmp(defel->defname, "use_snapshot") == 0) - { - if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); - snapshot_action_given = true; - *snapshot_action = CRS_USE_SNAPSHOT; + if (strcmp(action, "export") == 0) + *snapshot_action = CRS_EXPORT_SNAPSHOT; + else if (strcmp(action, "nothing") == 0) + *snapshot_action = CRS_NOEXPORT_SNAPSHOT; + else if (strcmp(action, "use") == 0) + *snapshot_action = CRS_USE_SNAPSHOT; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"", + defel->defname, action))); + } else if (strcmp(defel->defname, "reserve_wal") == 0) { @@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, errmsg("conflicting or redundant options"))); reserve_wal_given = true; - *reserve_wal = true; + *reserve_wal = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); two_phase_given = true; - *two_phase = true; + *two_phase = defGetBoolean(defel); } else elog(ERROR, "unrecognized option: %s", defel->defname); @@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called inside a transaction", - "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')"))); need_full_snapshot = true; } @@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called inside a transaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (XactIsoLevel != XACT_REPEATABLE_READ) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (FirstSnapshotSet) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called before any query", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (IsSubTransaction()) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called in a subtransaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); need_full_snapshot = true; } diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index d782b81adc..37237cd5d9 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -490,6 +490,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, { PQExpBuffer query; PGresult *res; + bool use_new_option_syntax = (PQserverVersion(conn) >= 150000); query = createPQExpBuffer(); @@ -498,27 +499,54 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, Assert(!(two_phase && is_physical)); Assert(slot_name != NULL); - /* Build query */ + /* Build base portion of query */ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); if (is_temporary) appendPQExpBufferStr(query, " TEMPORARY"); if (is_physical) - { appendPQExpBufferStr(query, " PHYSICAL"); + else + appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); + + /* Add any requested options */ + if (use_new_option_syntax) + appendPQExpBufferStr(query, " ("); + if (is_physical) + { if (reserve_wal) - appendPQExpBufferStr(query, " RESERVE_WAL"); + AppendPlainCommandOption(query, use_new_option_syntax, + "RESERVE_WAL"); } else { - appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); if (two_phase && PQserverVersion(conn) >= 150000) - appendPQExpBufferStr(query, " TWO_PHASE"); + AppendPlainCommandOption(query, use_new_option_syntax, + "TWO_PHASE"); if (PQserverVersion(conn) >= 100000) + { /* pg_recvlogical doesn't use an exported snapshot, so suppress */ - appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT"); + if (use_new_option_syntax) + AppendStringCommandOption(query, use_new_option_syntax, + "SNAPSHOT", "nothing"); + else + AppendPlainCommandOption(query, use_new_option_syntax, + "NOEXPORT_SNAPSHOT"); + } + } + if (use_new_option_syntax) + { + /* Suppress option list if it would be empty, otherwise terminate */ + if (query->data[query->len - 1] == '(') + { + query->len -= 2; + query->data[query->len] = '\0'; + } + else + appendPQExpBufferChar(query, ')'); } + /* Now run the query */ res = PQexec(conn, query->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) {