diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index ed9a3d6c0e..f23f15b04d 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ spill slot truncate stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top concurrent_stream REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out new file mode 100644 index 0000000000..e731d13d8f --- /dev/null +++ b/contrib/test_decoding/expected/concurrent_stream.out @@ -0,0 +1,19 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes +step s0_begin: BEGIN; +step s0_ddl: CREATE TABLE stream_test1(data text); +step s1_ddl: CREATE TABLE stream_test(data text); +step s1_begin: BEGIN; +step s1_toast_insert: INSERT INTO stream_test SELECT large_val(); +step s1_commit: COMMIT; +step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +data + +opening a streamed block for transaction +streaming change for transaction +closing a streamed block for transaction +committing streamed transaction +?column? + +stop diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index d7e32f8185..e1c3bc838d 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -29,10 +29,7 @@ COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ---------------------------------------------------------- - opening a streamed block for transaction streaming message: transactional: 1 prefix: test, sz: 50 - closing a streamed block for transaction - aborting streamed (sub)transaction opening a streamed block for transaction streaming change for transaction streaming change for transaction @@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction closing a streamed block for transaction committing streamed transaction -(27 rows) +(24 rows) -- streaming test for toast changes ALTER TABLE stream_test ALTER COLUMN data set storage external; diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec new file mode 100644 index 0000000000..ad9fde9c28 --- /dev/null +++ b/contrib/test_decoding/specs/concurrent_stream.spec @@ -0,0 +1,37 @@ +# Test decoding of in-progress transaction containing dml and a concurrent +# transaction with ddl operation. The transaction containing ddl operation +# should not get streamed as it doesn't have any changes. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g'; +} + +teardown +{ + DROP TABLE IF EXISTS stream_test; + DROP TABLE IF EXISTS stream_test1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_ddl" {CREATE TABLE stream_test1(data text);} + +# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to +# the currently running s0_ddl and we want to test that s0_ddl should not get +# streamed when user asked to skip-empty-xacts. +session "s1" +setup { SET synchronous_commit=on; } +step "s1_ddl" { CREATE TABLE stream_test(data text); } +step "s1_begin" { BEGIN; } +step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();} +step "s1_commit" { COMMIT; } +step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');} + +permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes" diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 34745150e9..e60ab34a5a 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_stream_start(LogicalDecodingContext *ctx, + TestDecodingData *data, + ReorderBufferTXN *txn, + bool last_write); static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pg_decode_stream_abort(LogicalDecodingContext *ctx, @@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; - OutputPluginPrepareWrite(ctx, true); + data->xact_wrote_changes = false; + if (data->skip_empty_xacts) + return; + pg_output_stream_start(ctx, data, txn, true); +} + +static void +pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); else appendStringInfo(ctx->out, "opening a streamed block for transaction"); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); @@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); @@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + /* output stream start if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);