Fix ALTER PUBLICATION...DROP TABLE behavior.

Commit 69bd60672 fixed the initialization of streamed transactions for
RelationSyncEntry. It forgot to initialize the publication actions while
invalidating the RelationSyncEntry due to which even though the relation
is dropped from a particular publication we still publish its changes. Fix
it by initializing pubactions when entry got invalidated.

Author: Japin Li and Bharath Rupireddy
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CALj2ACV+0UFpcZs5czYgBpujM9p0Hg1qdOZai_43OU7bqHU_xw@mail.gmail.com
This commit is contained in:
Amit Kapila 2021-01-25 07:39:29 +05:30
parent a4b03de589
commit 40ab64c1ec
2 changed files with 105 additions and 1 deletions

View File

@ -1179,5 +1179,16 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
*/
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
/*
* There might be some relations dropped from the publication so we
* don't need to publish the changes for them.
*/
entry->pubactions.pubinsert = false;
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
}
}

View File

@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 23;
use Test::More tests => 27;
# Initialize publisher node
my $node_publisher = get_new_node('publisher');
@ -153,6 +153,99 @@ is($result, qq(20|-20|-1),
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full SELECT generate_series(1,10)");
# Test behaviour of ALTER PUBLICATION ... DROP TABLE
#
# When a publisher drops a table from publication, it should also stop sending
# its changes to subscribers. We look at the subscriber whether it receives
# the row that is inserted to the table in the publisher after it is dropped
# from the publication.
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002), 'check rows on subscriber before table drop from publication');
# Drop the table from publication
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_ins_only DROP TABLE tab_ins");
# Insert a row in publisher, but publisher will not send this row to subscriber
$node_publisher->safe_psql('postgres', "INSERT INTO tab_ins VALUES(8888)");
$node_publisher->wait_for_catchup('tap_sub');
# Subscriber will not receive the inserted row, after table is dropped from
# publication, so row count should remain the same.
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002), 'check rows on subscriber after table drop from publication');
# Delete the inserted row in publisher
$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a = 8888");
# Add the table to publication again
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
# Refresh publication after table is added to publication
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
# Test replication with multiple publications for a subscription such that the
# operations are performed on the table from the first publication in the list.
# Create tables on publisher
$node_publisher->safe_psql('postgres', "CREATE TABLE temp1 (a int)");
$node_publisher->safe_psql('postgres', "CREATE TABLE temp2 (a int)");
# Create tables on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE temp1 (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE temp2 (a int)");
# Setup logical replication that will only be used for this test
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1 WITH (publish = insert)");
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
);
$node_publisher->wait_for_catchup('tap_sub_temp1');
# Also wait for initial table sync to finish
$synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
# Subscriber table will have no rows initially
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM temp1");
is($result, qq(0), 'check initial rows on subscriber with multiple publications');
# Insert a row into the table that's part of first publication in subscriber
# list of publications.
$node_publisher->safe_psql('postgres', "INSERT INTO temp1 VALUES (1)");
$node_publisher->wait_for_catchup('tap_sub_temp1');
# Subscriber should receive the inserted row
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM temp1");
is($result, qq(1), 'check rows on subscriber with multiple publications');
# Drop subscription as we don't need it anymore
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1");
# Drop publications as we don't need them anymore
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp1");
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp2");
# Clean up the tables on both publisher and subscriber as we don't need them
$node_publisher->safe_psql('postgres', "DROP TABLE temp1");
$node_publisher->safe_psql('postgres', "DROP TABLE temp2");
$node_subscriber->safe_psql('postgres', "DROP TABLE temp1");
$node_subscriber->safe_psql('postgres', "DROP TABLE temp2");
# add REPLICA IDENTITY FULL so we can update
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_full REPLICA IDENTITY FULL");