diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 79765f9696..1b993fb032 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -342,10 +342,6 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { char *origin; - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - /*---------- * XXX: which behaviour do we want here? * @@ -357,7 +353,13 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) *---------- */ if (replorigin_by_oid(txn->origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); + } + } OutputPluginWrite(ctx, true); @@ -780,12 +782,13 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, { char *origin; - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - if (replorigin_by_oid(txn->origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); + } } OutputPluginWrite(ctx, true); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index d1e407aacb..b8f46f08cc 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -153,3 +153,72 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), $rows * 2, "2x$rows rows in t"); is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"), $rows * 2, "2x$rows rows in t2"); + +# Verify table data is synced with cascaded replication setup. This is mainly +# to test whether the data written by tablesync worker gets replicated. +my $node_pub = get_new_node('testpublisher1'); +$node_pub->init(allows_streaming => 'logical'); +$node_pub->start; + +my $node_pub_sub = get_new_node('testpublisher_subscriber'); +$node_pub_sub->init(allows_streaming => 'logical'); +$node_pub_sub->start; + +my $node_sub = get_new_node('testsubscriber1'); +$node_sub->init(allows_streaming => 'logical'); +$node_sub->start; + +# Create the tables in all nodes. +$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); +$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); +$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); + +# Create a cascaded replication setup like: +# N1 - Create publication testpub1. +# N2 - Create publication testpub2 and also include subscriber which subscribes +# to testpub1. +# N3 - Create subscription testsub2 subscribes to testpub2. +# +# Note that subscription on N3 needs to be created before subscription on N2 to +# test whether the data written by tablesync worker of N2 gets replicated. +$node_pub->safe_psql('postgres', + "CREATE PUBLICATION testpub1 FOR TABLE tab1"); + +$node_pub_sub->safe_psql('postgres', + "CREATE PUBLICATION testpub2 FOR TABLE tab1"); + +my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres'; +my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres'; + +$node_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2" +); + +$node_pub_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1" +); + +$node_pub->safe_psql('postgres', + "INSERT INTO tab1 values(generate_series(1,10))"); + +# Verify that the data is cascaded from testpub1 to testsub1 and further from +# testpub2 (which had testsub1) to testsub2. +$node_pub->wait_for_catchup('testsub1'); +$node_pub_sub->wait_for_catchup('testsub2'); + +# Drop subscriptions as we don't need them anymore +$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1"); +$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2"); + +# Drop publications as we don't need them anymore +$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1"); +$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_pub->safe_psql('postgres', "DROP TABLE tab1"); +$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1"); +$node_sub->safe_psql('postgres', "DROP TABLE tab1"); + +$node_pub->stop('fast'); +$node_pub_sub->stop('fast'); +$node_sub->stop('fast');