From 875693019053b8897ec3983e292acbb439b088c3 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 8 Sep 2022 06:54:13 +0530 Subject: [PATCH] Raise a warning if there is a possibility of data from multiple origins. This commit raises a warning message for a combination of options ('copy_data = true' and 'origin = none') during CREATE/ALTER subscription operations if the publication tables were also replicated from other publishers. During replication, we can skip the data from other origins as we have that information in WAL but that is not possible during initial sync so we raise a warning if there is such a possibility. Author: Vignesh C Reviewed-By: Peter Smith, Amit Kapila, Jonathan Katz, Shi yu, Wang wei Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com --- doc/src/sgml/ref/alter_subscription.sgml | 5 + doc/src/sgml/ref/create_subscription.sgml | 35 ++++++ src/backend/commands/subscriptioncmds.c | 133 +++++++++++++++++++++- src/test/subscription/t/030_origin.pl | 114 +++++++++++++++---- 4 files changed, 258 insertions(+), 29 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 64efc21f53..1e8d72062b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -172,6 +172,11 @@ ALTER SUBSCRIPTION name RENAME TO < Previously subscribed tables are not copied, even if a table's row filter WHERE clause has since been modified. + + See for details of + how copy_data = true can interact with the + origin parameter. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7390c715bc..4e001f8111 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -213,6 +213,11 @@ CREATE SUBSCRIPTION subscription_name for details. + + See for details of how + copy_data = true can interact with the + origin parameter. + @@ -315,6 +320,11 @@ CREATE SUBSCRIPTION subscription_nameany means that the publisher sends changes regardless of their origin. The default is any. + + See for details of how + copy_data = true can interact with the + origin parameter. + @@ -386,6 +396,31 @@ CREATE SUBSCRIPTION subscription_name + + When using a subscription parameter combination of + copy_data = true and origin = NONE, + the initial sync table data is copied directly from the publisher, meaning + that knowledge of the true origin of that data is not possible. If the + publisher also has subscriptions then the copied table data might have + originated from further upstream. This scenario is detected and a WARNING is + logged to the user, but the warning is only an indication of a potential + problem; it is the user's responsibility to make the necessary checks to + ensure the copied data origins are really as wanted or not. + + + + To find which tables might potentially include non-local origins (due to + other subscriptions created on the publisher) try this SQL query: + +# substitute <pub-names> below with your publication name(s) to be queried +SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename +FROM pg_publication P, + LATERAL pg_get_publication_tables(P.pubname) GPT + JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid), + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) +WHERE C.oid = GPT.relid AND P.pubname IN (<pub-names>); + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f87796e5af..66d800f0cf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -92,6 +92,10 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin(WalReceiverConn *wrconn, + List *publications, bool copydata, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { check_publications(wrconn, publications); + check_publications_origin(wrconn, publications, opts.copy_data, + opts.origin, NULL, 0, stmt->subname); /* * Set sync state based on if we were asked to do data copy or @@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, ListCell *lc; int off; int remove_rel_len; + int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels { @@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_count = list_length(subrel_states); /* * Build qsorted array of local table oids for faster lookup. This can * potentially contain all tables in the database so speed of lookup * is important. */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; foreach(lc, subrel_states) { @@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), + qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); + check_publications_origin(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count, sub->name); + /* * Rels that we want to remove from subscription and drop any slots * and origins corresponding to them. */ - sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); /* * Walk over the remote tables and try to match them to locally known @@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + subrel_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; @@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } +/* + * Check and log a warning if the publisher has subscribed to the same table + * from some other publisher. This check is required only if "copy_data = true" + * and "origin = none" for CREATE SUBSCRIPTION and + * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data + * having origin might have been copied. + * + * This check need not be performed on the tables that are already added + * because incremental sync for those tables will happen through WAL and the + * origin of the data can be identified from the WAL records. + * + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. + */ +static void +check_publications_origin(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + List *publist = NIL; + int i; + + if (!copydata || !origin || + (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND P.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains + * the list of relation oids that are already present on the subscriber. + * This check should be skipped for these tables. + */ + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same table from + * some other publisher. We cannot know the origin of data during the + * initial sync. Data origins can be found only from the WAL by looking at + * the origin id. + * + * XXX: For simplicity, we don't check whether the table has any data or + * not. If the table doesn't have any data then we don't need to + * distinguish between data having origin and data not having origin so we + * can avoid logging a warning in that case. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + get_publications_str(publist, pubnames, false); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", + subname), + errdetail_plural("Subscribed publication %s is subscribing to other publications.", + "Subscribed publications %s are subscribing to other publications.", + list_length(publist), pubnames->data), + errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index b297a51f7c..0a5cc4503b 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -1,13 +1,23 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test the CREATE SUBSCRIPTION 'origin' parameter. +# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with +# 'copy_data' parameter. use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $subname_AB = 'tap_sub_A_B'; +my $subname_AB2 = 'tap_sub_A_B_2'; +my $subname_BA = 'tap_sub_B_A'; +my $subname_BC = 'tap_sub_B_C'; + +my $result; +my $stdout; +my $stderr; + ############################################################################### # Setup a bidirectional logical replication between node_A & node_B ############################################################################### @@ -32,33 +42,29 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); -my $appname_B1 = 'tap_sub_B1'; $node_B->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_B1 - CONNECTION '$node_A_connstr application_name=$appname_B1' + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' PUBLICATION tap_pub_A WITH (origin = none)"); # node_B (pub) -> node_A (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); -my $appname_A = 'tap_sub_A'; $node_A->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_A - CONNECTION '$node_B_connstr application_name=$appname_A' + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' PUBLICATION tap_pub_B WITH (origin = none, copy_data = off)"); # Wait for initial table sync to finish -$node_A->wait_for_subscription_sync($node_B, $appname_A); -$node_B->wait_for_subscription_sync($node_A, $appname_B1); +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); is(1, 1, 'Bidirectional replication setup is complete'); -my $result; - ############################################################################### # Check that bidirectional logical replication setup does not cause infinite # recursive insertion. @@ -68,8 +74,8 @@ my $result; $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);"); $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); # check that transaction was committed on subscriber(s) $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); @@ -85,8 +91,8 @@ is( $result, qq(11 $node_A->safe_psql('postgres', "DELETE FROM tab;"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); ############################################################################### # Check that remote data of node_B (that originated from node_C) is not @@ -109,23 +115,20 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_C (pub) -> node_B (sub) my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab"); - -my $appname_B2 = 'tap_sub_B2'; $node_B->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_B2 - CONNECTION '$node_C_connstr application_name=$appname_B2' + CREATE SUBSCRIPTION $subname_BC + CONNECTION '$node_C_connstr application_name=$subname_BC' PUBLICATION tap_pub_C WITH (origin = none)"); - -$node_B->wait_for_subscription_sync($node_C, $appname_B2); +$node_B->wait_for_subscription_sync($node_C, $subname_BC); # insert a record $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); -$node_C->wait_for_catchup($appname_B2); -$node_B->wait_for_catchup($appname_A); -$node_A->wait_for_catchup($appname_B1); +$node_C->wait_for_catchup($subname_BC); +$node_B->wait_for_catchup($subname_AB); +$node_A->wait_for_catchup($subname_BA); $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); is($result, qq(32), 'The node_C data replicated to node_B'); @@ -136,6 +139,69 @@ is($result, qq(), 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' ); +############################################################################### +# Specifying origin = NONE indicates that the publisher should only replicate the +# changes that are generated locally from node_B, but in this case since the +# node_B is also subscribing data from node_A, node_B can have remotely +# originated data from node_A. We log a warning, in this case, to draw +# attention to there being possible remote data. +############################################################################### +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB2 + CONNECTION '$node_B_connstr application_name=$subname_AB2' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = on)"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, + "Create subscription with origin = none and copy_data when the publisher has subscribed same table" +); + +$node_A->wait_for_subscription_sync($node_B, $subname_AB2); + +# Alter subscription ... refresh publication should be successful when no new +# table is added +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); + +# Check Alter subscription ... refresh publication when there is a new +# table that is subscribing data from a different publication +$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); +$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); + +# add a new table to the publication +$node_A->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new"); +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION"); + +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +# add a new table to the publication +$node_B->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new"); + +# Alter subscription ... refresh publication should log a warning when a new +# table in the publisher is subscribing data from a different publication +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, + "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none" +); + +$node_A->wait_for_subscription_sync($node_B, $subname_AB2); + +# clear the operations done by this test +$node_A->safe_psql('postgres', "DROP TABLE tab_new"); +$node_B->safe_psql('postgres', "DROP TABLE tab_new"); +$node_A->safe_psql('postgres', "DROP SUBSCRIPTION $subname_AB2"); + # shutdown $node_B->stop('fast'); $node_A->stop('fast');