diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index 49ffaeea2d..c85e1a01b2 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn (1 row) +-- Test that the pgoutput correctly filters changes corresponding to the provided origin value. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput'); + ?column? +---------- + init +(1 row) + +CREATE PUBLICATION pub FOR TABLE target_tbl; +SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); + pg_replication_origin_create +------------------------------ + 1 +(1 row) + +-- mark session as replaying +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); + pg_replication_origin_session_setup +------------------------------------- + +(1 row) + +INSERT INTO target_tbl(data) VALUES ('test data'); +-- The replayed change will be filtered. +SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none'); + ?column? +---------- + t +(1 row) + +-- The replayed change will be output if the origin value is not specified. +SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub'); + ?column? +---------- + t +(1 row) + +-- Clean up +SELECT pg_replication_origin_session_reset(); + pg_replication_origin_session_reset +------------------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot'); + pg_replication_origin_drop +---------------------------- + +(1 row) + +DROP PUBLICATION pub; diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index db06541f56..e71ee02d05 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL SELECT pg_replication_origin_session_reset(); SELECT pg_drop_replication_slot('regression_slot_no_lsn'); SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn'); + +-- Test that the pgoutput correctly filters changes corresponding to the provided origin value. +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput'); +CREATE PUBLICATION pub FOR TABLE target_tbl; +SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); + +-- mark session as replaying +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); + +INSERT INTO target_tbl(data) VALUES ('test data'); + +-- The replayed change will be filtered. +SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none'); + +-- The replayed change will be output if the origin value is not specified. +SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub'); + +-- Clean up +SELECT pg_replication_origin_session_reset(); +SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot'); +DROP PUBLICATION pub; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3d2becb45c..251ba46da5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; -static bool publish_no_origin; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data) } else if (strcmp(defel->defname, "origin") == 0) { + char *origin; + if (origin_option_given) ereport(ERROR, errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")); origin_option_given = true; - data->origin = defGetString(defel); - if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) - publish_no_origin = true; - else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0) - publish_no_origin = false; + origin = defGetString(defel); + if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0) + data->publish_no_origin = true; + else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0) + data->publish_no_origin = false; else ereport(ERROR, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unrecognized origin value: \"%s\"", data->origin)); + errmsg("unrecognized origin value: \"%s\"", origin)); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); @@ -1673,7 +1674,9 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { - if (publish_no_origin && origin_id != InvalidRepOriginId) + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->publish_no_origin && origin_id != InvalidRepOriginId) return true; return false; diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b4a8015403..b3f9a01629 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,7 +29,7 @@ typedef struct PGOutputData char streaming; bool messages; bool two_phase; - char *origin; + bool publish_no_origin; } PGOutputData; #endif /* PGOUTPUT_H */