Refactor function parse_output_parameters.

Instead of using multiple parameters in parse_ouput_parameters function
signature, use the struct PGOutputData that encapsulates all pgoutput
options. It will be useful for future work where we need to add other
options in pgoutput.

Author: Euler Taveira
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-04-06 08:26:31 +05:30
parent 6d41dd045a
commit 531737ddad
2 changed files with 10 additions and 15 deletions

View File

@ -156,9 +156,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
} }
static void static void
parse_output_parameters(List *options, uint32 *protocol_version, parse_output_parameters(List *options, PGOutputData *data)
List **publication_names, bool *binary,
bool *enable_streaming)
{ {
ListCell *lc; ListCell *lc;
bool protocol_version_given = false; bool protocol_version_given = false;
@ -166,7 +164,8 @@ parse_output_parameters(List *options, uint32 *protocol_version,
bool binary_option_given = false; bool binary_option_given = false;
bool streaming_given = false; bool streaming_given = false;
*binary = false; data->binary = false;
data->streaming = false;
foreach(lc, options) foreach(lc, options)
{ {
@ -196,7 +195,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("proto_version \"%s\" out of range", errmsg("proto_version \"%s\" out of range",
strVal(defel->arg)))); strVal(defel->arg))));
*protocol_version = (uint32) parsed; data->protocol_version = (uint32) parsed;
} }
else if (strcmp(defel->defname, "publication_names") == 0) else if (strcmp(defel->defname, "publication_names") == 0)
{ {
@ -207,7 +206,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
publication_names_given = true; publication_names_given = true;
if (!SplitIdentifierString(strVal(defel->arg), ',', if (!SplitIdentifierString(strVal(defel->arg), ',',
publication_names)) &data->publication_names))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME), (errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax"))); errmsg("invalid publication_names syntax")));
@ -220,7 +219,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
binary_option_given = true; binary_option_given = true;
*binary = defGetBoolean(defel); data->binary = defGetBoolean(defel);
} }
else if (strcmp(defel->defname, "streaming") == 0) else if (strcmp(defel->defname, "streaming") == 0)
{ {
@ -230,7 +229,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("conflicting or redundant options"))); errmsg("conflicting or redundant options")));
streaming_given = true; streaming_given = true;
*enable_streaming = defGetBoolean(defel); data->streaming = defGetBoolean(defel);
} }
else else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@ -244,7 +243,6 @@ static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init) bool is_init)
{ {
bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData)); PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */ /* Create our memory context for private allocations. */
@ -265,11 +263,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
if (!is_init) if (!is_init)
{ {
/* Parse the params and ERROR if we see any we don't recognize */ /* Parse the params and ERROR if we see any we don't recognize */
parse_output_parameters(ctx->output_plugin_options, parse_output_parameters(ctx->output_plugin_options, data);
&data->protocol_version,
&data->publication_names,
&data->binary,
&enable_streaming);
/* Check if we support requested protocol */ /* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@ -295,7 +289,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when * we only allow it with sufficient version of the protocol, and when
* the output plugin supports it. * the output plugin supports it.
*/ */
if (!enable_streaming) if (!data->streaming)
ctx->streaming = false; ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR, ereport(ERROR,

View File

@ -25,6 +25,7 @@ typedef struct PGOutputData
List *publication_names; List *publication_names;
List *publications; List *publications;
bool binary; bool binary;
bool streaming;
} PGOutputData; } PGOutputData;
#endif /* PGOUTPUT_H */ #endif /* PGOUTPUT_H */