Refactor function parse_subscription_options.

Instead of using multiple parameters in parse_subscription_options
function signature, use the struct SubOpts that encapsulate all the
subscription options and their values. It will be useful for future work
where we need to add other options in the subscription. Also, use bitmaps
to pass the supported and retrieve the specified options much like the way
it is done in the commit a3dc926009.

Author: Bharath Rupireddy
Reviewed-By: Peter Smith, Amit Kapila, Alvaro Herrera
Discussion: https://postgr.es/m/CALj2ACXtoQczfNsDQWobypVvHbX2DtgEHn8DawS0eGFwuo72kw@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-07-06 07:46:50 +05:30
parent 9ee91cc583
commit 8aafb02616
2 changed files with 202 additions and 228 deletions

View File

@ -46,6 +46,41 @@
#include "utils/memutils.h"
#include "utils/syscache.h"
/*
* Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
* command.
*/
#define SUBOPT_CONNECT 0x00000001
#define SUBOPT_ENABLED 0x00000002
#define SUBOPT_CREATE_SLOT 0x00000004
#define SUBOPT_SLOT_NAME 0x00000008
#define SUBOPT_COPY_DATA 0x00000010
#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
#define SUBOPT_REFRESH 0x00000040
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
/*
* Structure to hold a bitmap representing the user-provided CREATE/ALTER
* SUBSCRIPTION command options and the parsed/default values of each of them.
*/
typedef struct SubOpts
{
bits32 specified_opts;
char *slot_name;
char *synchronous_commit;
bool connect;
bool enabled;
bool create_slot;
bool copy_data;
bool refresh;
bool binary;
bool streaming;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@ -56,164 +91,151 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
*
* Since not all options can be specified in both commands, this function
* will report an error on options if the target output pointer is NULL to
* accommodate that.
* will report an error if mutually exclusive options are specified.
*
* Caller is expected to have cleared 'opts'.
*/
static void
parse_subscription_options(List *options,
bool *connect,
bool *enabled_given, bool *enabled,
bool *create_slot,
bool *slot_name_given, char **slot_name,
bool *copy_data,
char **synchronous_commit,
bool *refresh,
bool *binary_given, bool *binary,
bool *streaming_given, bool *streaming)
parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
{
ListCell *lc;
bool connect_given = false;
bool create_slot_given = false;
bool copy_data_given = false;
bool refresh_given = false;
/* If connect is specified, the others also need to be. */
Assert(!connect || (enabled && create_slot && copy_data));
/* caller must expect some option */
Assert(supported_opts != 0);
if (connect)
*connect = true;
if (enabled)
{
*enabled_given = false;
*enabled = true;
}
if (create_slot)
*create_slot = true;
if (slot_name)
{
*slot_name_given = false;
*slot_name = NULL;
}
if (copy_data)
*copy_data = true;
if (synchronous_commit)
*synchronous_commit = NULL;
if (refresh)
*refresh = true;
if (binary)
{
*binary_given = false;
*binary = false;
}
if (streaming)
{
*streaming_given = false;
*streaming = false;
}
/* If connect option is supported, these others also need to be. */
Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_COPY_DATA));
/* Set default values for the boolean supported options. */
if (IsSet(supported_opts, SUBOPT_CONNECT))
opts->connect = true;
if (IsSet(supported_opts, SUBOPT_ENABLED))
opts->enabled = true;
if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
opts->create_slot = true;
if (IsSet(supported_opts, SUBOPT_COPY_DATA))
opts->copy_data = true;
if (IsSet(supported_opts, SUBOPT_REFRESH))
opts->refresh = true;
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = false;
/* Parse options */
foreach(lc, options)
foreach(lc, stmt_options)
{
DefElem *defel = (DefElem *) lfirst(lc);
if (strcmp(defel->defname, "connect") == 0 && connect)
if (IsSet(supported_opts, SUBOPT_CONNECT) &&
strcmp(defel->defname, "connect") == 0)
{
if (connect_given)
if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
connect_given = true;
*connect = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_CONNECT;
opts->connect = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "enabled") == 0 && enabled)
else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
strcmp(defel->defname, "enabled") == 0)
{
if (*enabled_given)
if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*enabled_given = true;
*enabled = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_ENABLED;
opts->enabled = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
strcmp(defel->defname, "create_slot") == 0)
{
if (create_slot_given)
if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
create_slot_given = true;
*create_slot = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_CREATE_SLOT;
opts->create_slot = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
strcmp(defel->defname, "slot_name") == 0)
{
if (*slot_name_given)
if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*slot_name_given = true;
*slot_name = defGetString(defel);
opts->specified_opts |= SUBOPT_SLOT_NAME;
opts->slot_name = defGetString(defel);
/* Setting slot_name = NONE is treated as no slot name. */
if (strcmp(*slot_name, "none") == 0)
*slot_name = NULL;
if (strcmp(opts->slot_name, "none") == 0)
opts->slot_name = NULL;
}
else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
strcmp(defel->defname, "copy_data") == 0)
{
if (copy_data_given)
if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
copy_data_given = true;
*copy_data = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_COPY_DATA;
opts->copy_data = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
synchronous_commit)
else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
strcmp(defel->defname, "synchronous_commit") == 0)
{
if (*synchronous_commit)
if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*synchronous_commit = defGetString(defel);
opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
opts->synchronous_commit = defGetString(defel);
/* Test if the given value is valid for synchronous_commit GUC. */
(void) set_config_option("synchronous_commit", *synchronous_commit,
(void) set_config_option("synchronous_commit", opts->synchronous_commit,
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
false, 0, false);
}
else if (strcmp(defel->defname, "refresh") == 0 && refresh)
else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
strcmp(defel->defname, "refresh") == 0)
{
if (refresh_given)
if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
refresh_given = true;
*refresh = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_REFRESH;
opts->refresh = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "binary") == 0 && binary)
else if (IsSet(supported_opts, SUBOPT_BINARY) &&
strcmp(defel->defname, "binary") == 0)
{
if (*binary_given)
if (IsSet(opts->specified_opts, SUBOPT_BINARY))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*binary_given = true;
*binary = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_BINARY;
opts->binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0 && streaming)
else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
strcmp(defel->defname, "streaming") == 0)
{
if (*streaming_given)
if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*streaming_given = true;
*streaming = defGetBoolean(defel);
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetBoolean(defel);
}
else
ereport(ERROR,
@ -225,63 +247,81 @@ parse_subscription_options(List *options,
* We've been explicitly asked to not connect, that requires some
* additional processing.
*/
if (connect && !*connect)
if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
{
/* Check for incompatible options from the user. */
if (enabled && *enabled_given && *enabled)
if (opts->enabled &&
IsSet(supported_opts, SUBOPT_ENABLED) &&
IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"connect = false", "enabled = true")));
if (create_slot && create_slot_given && *create_slot)
if (opts->create_slot &&
IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"connect = false", "create_slot = true")));
if (copy_data && copy_data_given && *copy_data)
if (opts->copy_data &&
IsSet(supported_opts, SUBOPT_COPY_DATA) &&
IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));
/* Change the defaults of other options. */
*enabled = false;
*create_slot = false;
*copy_data = false;
opts->enabled = false;
opts->create_slot = false;
opts->copy_data = false;
}
/*
* Do additional checking for disallowed combination when slot_name = NONE
* was used.
*/
if (slot_name && *slot_name_given && !*slot_name)
if (!opts->slot_name &&
IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
{
if (enabled && *enabled_given && *enabled)
if (opts->enabled &&
IsSet(supported_opts, SUBOPT_ENABLED) &&
IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"slot_name = NONE", "enabled = true")));
if (create_slot && create_slot_given && *create_slot)
if (opts->create_slot &&
IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"slot_name = NONE", "create_slot = true")));
if (enabled && !*enabled_given && *enabled)
if (opts->enabled &&
IsSet(supported_opts, SUBOPT_ENABLED) &&
!IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "enabled = false")));
if (create_slot && !create_slot_given && *create_slot)
if (opts->create_slot &&
IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
!IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "create_slot = false")));
}
@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
bool connect;
bool enabled_given;
bool enabled;
bool copy_data;
bool streaming;
bool streaming_given;
char *synchronous_commit;
char *conninfo;
char *slotname;
bool slotname_given;
bool binary;
bool binary_given;
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
bits32 supported_opts;
SubOpts opts = {0};
/*
* Parse and check options.
*
* Connection and publication should not be specified here.
*/
parse_subscription_options(stmt->options,
&connect,
&enabled_given, &enabled,
&create_slot,
&slotname_given, &slotname,
&copy_data,
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary,
&streaming_given, &streaming);
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING);
parse_subscription_options(stmt->options, supported_opts, &opts);
/*
* Since creating a replication slot is not transactional, rolling back
@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* CREATE SUBSCRIPTION inside a transaction block if creating a
* replication slot.
*/
if (create_slot)
if (opts.create_slot)
PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
if (!superuser())
@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
stmt->subname)));
}
if (!slotname_given && slotname == NULL)
slotname = stmt->subname;
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
opts.slot_name = stmt->subname;
/* The default for synchronous_commit of subscriptions is off. */
if (synchronous_commit == NULL)
synchronous_commit = "off";
if (opts.synchronous_commit == NULL)
opts.synchronous_commit = "off";
conninfo = stmt->conninfo;
publications = stmt->publication;
@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(slotname));
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
else
nulls[Anum_pg_subscription_subslotname - 1] = true;
values[Anum_pg_subscription_subsynccommit - 1] =
CStringGetTextDatum(synchronous_commit);
CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connect to remote side to execute requested commands and fetch table
* info.
*/
if (connect)
if (opts.connect)
{
char *err;
WalReceiverConn *wrconn;
@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Set sync state based on if we were asked to do data copy or
* not.
*/
table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
* Get the table list from publisher and build local table status
@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* won't use the initial snapshot for anything, so no need to
* export it.
*/
if (create_slot)
if (opts.create_slot)
{
Assert(slotname);
Assert(opts.slot_name);
walrcv_create_slot(wrconn, slotname, false,
walrcv_create_slot(wrconn, opts.slot_name, false,
CRS_NOEXPORT_SNAPSHOT, NULL);
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
slotname)));
opts.slot_name)));
}
}
PG_FINALLY();
@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
table_close(rel, RowExclusiveLock);
if (enabled)
if (opts.enabled)
ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);
@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
bool update_tuple = false;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
SubOpts opts = {0};
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
{
case ALTER_SUBSCRIPTION_OPTIONS:
{
char *slotname;
bool slotname_given;
char *synchronous_commit;
bool binary_given;
bool binary;
bool streaming_given;
bool streaming;
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING);
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
&slotname_given, &slotname,
NULL, /* no "copy_data" */
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary,
&streaming_given, &streaming);
parse_subscription_options(stmt->options, supported_opts, &opts);
if (slotname_given)
if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
{
if (sub->enabled && !slotname)
if (sub->enabled && !opts.slot_name)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"slot_name = NONE")));
if (slotname)
if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(slotname));
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
else
nulls[Anum_pg_subscription_subslotname - 1] = true;
replaces[Anum_pg_subscription_subslotname - 1] = true;
}
if (synchronous_commit)
if (opts.synchronous_commit)
{
values[Anum_pg_subscription_subsynccommit - 1] =
CStringGetTextDatum(synchronous_commit);
CStringGetTextDatum(opts.synchronous_commit);
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
if (binary_given)
if (IsSet(opts.specified_opts, SUBOPT_BINARY))
{
values[Anum_pg_subscription_subbinary - 1] =
BoolGetDatum(binary);
BoolGetDatum(opts.binary);
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
if (streaming_given)
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(streaming);
BoolGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
@ -861,31 +876,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_ENABLED:
{
bool enabled,
enabled_given;
parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
&enabled_given, &enabled,
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL, /* no "binary" */
NULL, NULL); /* no streaming */
Assert(enabled_given);
if (!sub->slotname && enabled)
if (!sub->slotname && opts.enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(enabled);
BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
if (enabled)
if (opts.enabled)
ApplyLauncherWakeupAtCommit();
update_tuple = true;
@ -906,19 +909,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
bool copy_data;
bool refresh;
supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
parse_subscription_options(stmt->options, supported_opts, &opts);
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
&copy_data,
NULL, /* no "synchronous_commit" */
&refresh,
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
@ -926,7 +919,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
update_tuple = true;
/* Refresh if user asked us to. */
if (refresh)
if (opts.refresh)
{
if (!sub->enabled)
ereport(ERROR,
@ -939,7 +932,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
AlterSubscription_refresh(sub, copy_data);
AlterSubscription_refresh(sub, opts.copy_data);
}
break;
@ -948,25 +941,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
{
bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
bool copy_data = false;
bool refresh;
List *publist;
bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
isadd ? &copy_data : NULL, /* for drop, no
* "copy_data" */
NULL, /* no "synchronous_commit" */
&refresh,
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
supported_opts = SUBOPT_REFRESH;
if (isadd)
supported_opts |= SUBOPT_COPY_DATA;
parse_subscription_options(stmt->options, supported_opts, &opts);
publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publist);
replaces[Anum_pg_subscription_subpublications - 1] = true;
@ -974,7 +958,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
update_tuple = true;
/* Refresh if user asked us to. */
if (refresh)
if (opts.refresh)
{
if (!sub->enabled)
ereport(ERROR,
@ -987,7 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* Only refresh the added/dropped list of publications. */
sub->publications = stmt->publication;
AlterSubscription_refresh(sub, copy_data);
AlterSubscription_refresh(sub, opts.copy_data);
}
break;
@ -995,27 +979,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_REFRESH:
{
bool copy_data;
if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, NULL, /* no "enabled" */
NULL, /* no "create_slot" */
NULL, NULL, /* no "slot_name" */
&copy_data,
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
AlterSubscription_refresh(sub, copy_data);
AlterSubscription_refresh(sub, opts.copy_data);
break;
}

View File

@ -2511,6 +2511,7 @@ StringInfoData
StripnullState
SubLink
SubLinkType
SubOpts
SubPlan
SubPlanState
SubRemoveRels