From 1046a69b3087a6417e85cae9b6bc76caa22f913b Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 24 Aug 2021 08:25:21 +0530 Subject: [PATCH] Fix Alter Subscription's Add/Drop Publication behavior. The current refresh behavior tries to just refresh added/dropped publications but that leads to removing wrong tables from subscription. We can't refresh just the dropped publication because it is quite possible that some of the tables are removed from publication by that time and now those will remain as part of the subscription. Also, there is a chance that the tables that were part of the publication being dropped are also part of another publication, so we can't remove those. So, we decided that by default, add/drop commands will also act like REFRESH PUBLICATION which means they will refresh all the publications. We can keep the old behavior for "add publication" but it is better to be consistent with "drop publication". Author: Hou Zhijie Reviewed-by: Masahiko Sawada, Amit Kapila Backpatch-through: 14, where it was introduced Discussion: https://postgr.es/m/OS0PR01MB5716935D4C2CC85A6143073F94EF9@OS0PR01MB5716.jpnprd01.prod.outlook.com --- doc/src/sgml/ref/alter_subscription.sgml | 7 +- src/backend/commands/subscriptioncmds.c | 9 +- src/bin/psql/tab-complete.c | 8 +- src/test/regress/expected/subscription.out | 3 - src/test/regress/sql/subscription.sql | 3 - src/test/subscription/t/024_add_drop_pub.pl | 98 +++++++++++++++++++++ 6 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 src/test/subscription/t/024_add_drop_pub.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..835be0d2a4 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -111,9 +111,7 @@ ALTER SUBSCRIPTION name RENAME TO < publications, and DROP removes the publications from the list of publications. See for more information. By default, this command will also act like - REFRESH PUBLICATION, except that in case of - ADD or DROP, only the added or - dropped publications are refreshed. + REFRESH PUBLICATION. @@ -134,8 +132,7 @@ ALTER SUBSCRIPTION name RENAME TO < Additionally, refresh options as described - under REFRESH PUBLICATION may be specified, - except in the case of DROP PUBLICATION. + under REFRESH PUBLICATION may be specified. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5157f44058..c47ba26369 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1006,10 +1006,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, List *publist; bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - supported_opts = SUBOPT_REFRESH; - if (isadd) - supported_opts |= SUBOPT_COPY_DATA; - + supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1042,8 +1039,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); - /* Only refresh the added/dropped list of publications. */ - sub->publications = stmt->publication; + /* Refresh the new list of publications. */ + sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data); } diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 0750f70273..b48d193595 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1675,14 +1675,10 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny)) COMPLETE_WITH("WITH ("); - /* ALTER SUBSCRIPTION ADD|SET PUBLICATION WITH ( */ + /* ALTER SUBSCRIPTION ADD|DROP|SET PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && - TailMatches("ADD|SET", "PUBLICATION", MatchAny, "WITH", "(")) + TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "(")) COMPLETE_WITH("copy_data", "refresh"); - /* ALTER SUBSCRIPTION DROP PUBLICATION WITH ( */ - else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && - TailMatches("DROP", "PUBLICATION", MatchAny, "WITH", "(")) - COMPLETE_WITH("refresh"); /* ALTER SCHEMA */ else if (Matches("ALTER", "SCHEMA", MatchAny)) diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 77b4437b69..15a1ac6398 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -230,9 +230,6 @@ ERROR: cannot drop all the publications from a subscription -- fail - publication does not exist in subscription ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false); ERROR: publication "testpub3" is not in subscription "regress_testsub" --- fail - do not support copy_data option -ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true); -ERROR: unrecognized subscription parameter: "copy_data" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index d42104c191..7faa935a2a 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -171,9 +171,6 @@ ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 -- fail - publication does not exist in subscription ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false); --- fail - do not support copy_data option -ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true); - -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl new file mode 100644 index 0000000000..24493a9c4e --- /dev/null +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -0,0 +1,98 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Initialize publisher node +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create table on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_1 (a int)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_1 SELECT generate_series(1,10)"); + +# Create table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_1 (a int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2" +); + +# Wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_1 is copied to subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_1"); +is($result, qq(10|1|10), 'check initial data is copied to subscriber'); + +# Create a new table on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_2 (a int)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_2 SELECT generate_series(1,10)"); + +# Create a new table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_2 (a int)"); + +# Add the table to publication +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_2 ADD TABLE tab_2"); + +# Dropping tap_pub_1 will refresh the entire publication list +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1"); + +# Wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_drop_refresh was copied to subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_2"); +is($result, qq(10|1|10), 'check initial data is copied to subscriber'); + +# Re-adding tap_pub_1 will refresh the entire publication list +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1"); + +# Wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_1 was copied to subscriber again +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_1"); +is($result, qq(20|1|10), 'check initial data is copied to subscriber'); + +# shutdown +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');