diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 64efc21f53..1e8d72062b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -172,6 +172,11 @@ ALTER SUBSCRIPTION name RENAME TO < Previously subscribed tables are not copied, even if a table's row filter WHERE clause has since been modified. + + See for details of + how copy_data = true can interact with the + origin parameter. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7390c715bc..4e001f8111 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -213,6 +213,11 @@ CREATE SUBSCRIPTION subscription_name for details. + + See for details of how + copy_data = true can interact with the + origin parameter. + @@ -315,6 +320,11 @@ CREATE SUBSCRIPTION subscription_nameany means that the publisher sends changes regardless of their origin. The default is any. + + See for details of how + copy_data = true can interact with the + origin parameter. + @@ -386,6 +396,31 @@ CREATE SUBSCRIPTION subscription_name + + When using a subscription parameter combination of + copy_data = true and origin = NONE, + the initial sync table data is copied directly from the publisher, meaning + that knowledge of the true origin of that data is not possible. If the + publisher also has subscriptions then the copied table data might have + originated from further upstream. This scenario is detected and a WARNING is + logged to the user, but the warning is only an indication of a potential + problem; it is the user's responsibility to make the necessary checks to + ensure the copied data origins are really as wanted or not. + + + + To find which tables might potentially include non-local origins (due to + other subscriptions created on the publisher) try this SQL query: + +# substitute <pub-names> below with your publication name(s) to be queried +SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename +FROM pg_publication P, + LATERAL pg_get_publication_tables(P.pubname) GPT + JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid), + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) +WHERE C.oid = GPT.relid AND P.pubname IN (<pub-names>); + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f87796e5af..66d800f0cf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -92,6 +92,10 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin(WalReceiverConn *wrconn, + List *publications, bool copydata, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { check_publications(wrconn, publications); + check_publications_origin(wrconn, publications, opts.copy_data, + opts.origin, NULL, 0, stmt->subname); /* * Set sync state based on if we were asked to do data copy or @@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, ListCell *lc; int off; int remove_rel_len; + int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels { @@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_count = list_length(subrel_states); /* * Build qsorted array of local table oids for faster lookup. This can * potentially contain all tables in the database so speed of lookup * is important. */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; foreach(lc, subrel_states) { @@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), + qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); + check_publications_origin(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count, sub->name); + /* * Rels that we want to remove from subscription and drop any slots * and origins corresponding to them. */ - sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); /* * Walk over the remote tables and try to match them to locally known @@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + subrel_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; @@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } +/* + * Check and log a warning if the publisher has subscribed to the same table + * from some other publisher. This check is required only if "copy_data = true" + * and "origin = none" for CREATE SUBSCRIPTION and + * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data + * having origin might have been copied. + * + * This check need not be performed on the tables that are already added + * because incremental sync for those tables will happen through WAL and the + * origin of the data can be identified from the WAL records. + * + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. + */ +static void +check_publications_origin(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + List *publist = NIL; + int i; + + if (!copydata || !origin || + (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND P.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains + * the list of relation oids that are already present on the subscriber. + * This check should be skipped for these tables. + */ + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same table from + * some other publisher. We cannot know the origin of data during the + * initial sync. Data origins can be found only from the WAL by looking at + * the origin id. + * + * XXX: For simplicity, we don't check whether the table has any data or + * not. If the table doesn't have any data then we don't need to + * distinguish between data having origin and data not having origin so we + * can avoid logging a warning in that case. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + get_publications_str(publist, pubnames, false); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", + subname), + errdetail_plural("Subscribed publication %s is subscribing to other publications.", + "Subscribed publications %s are subscribing to other publications.", + list_length(publist), pubnames->data), + errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index b297a51f7c..0a5cc4503b 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -1,13 +1,23 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test the CREATE SUBSCRIPTION 'origin' parameter. +# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with +# 'copy_data' parameter. use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $subname_AB = 'tap_sub_A_B'; +my $subname_AB2 = 'tap_sub_A_B_2'; +my $subname_BA = 'tap_sub_B_A'; +my $subname_BC = 'tap_sub_B_C'; + +my $result; +my $stdout; +my $stderr; + ############################################################################### # Setup a bidirectional logical replication between node_A & node_B ############################################################################### @@ -32,33 +42,29 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); -my $appname_B1 = 'tap_sub_B1'; $node_B->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_B1 - CONNECTION '$node_A_connstr application_name=$appname_B1' + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' PUBLICATION tap_pub_A WITH (origin = none)"); # node_B (pub) -> node_A (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); -my $appname_A = 'tap_sub_A'; $node_A->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_A - CONNECTION '$node_B_connstr application_name=$appname_A' + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' PUBLICATION tap_pub_B WITH (origin = none, copy_data = off)"); # Wait for initial table sync to finish -$node_A->wait_for_subscription_sync($node_B, $appname_A); -$node_B->wait_for_subscription_sync($node_A, $appname_B1); +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); is(1, 1, 'Bidirectional replication setup is complete'); -my $result; - ############################################################################### # Check that bidirectional logical replication setup does not cause infinite # recursive insertion. @@ -68,8 +74,8 @@ my $result; $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);"); $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); # check that transaction was committed on subscriber(s) $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); @@ -85,8 +91,8 @@ is( $result, qq(11 $node_A->safe_psql('postgres', "DELETE FROM tab;"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); ############################################################################### # Check that remote data of node_B (that originated from node_C) is not @@ -109,23 +115,20 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_C (pub) -> node_B (sub) my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab"); - -my $appname_B2 = 'tap_sub_B2'; $node_B->safe_psql( 'postgres', " - CREATE SUBSCRIPTION tap_sub_B2 - CONNECTION '$node_C_connstr application_name=$appname_B2' + CREATE SUBSCRIPTION $subname_BC + CONNECTION '$node_C_connstr application_name=$subname_BC' PUBLICATION tap_pub_C WITH (origin = none)"); - -$node_B->wait_for_subscription_sync($node_C, $appname_B2); +$node_B->wait_for_subscription_sync($node_C, $subname_BC); # insert a record $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); -$node_C->wait_for_catchup($appname_B2); -$node_B->wait_for_catchup($appname_A); -$node_A->wait_for_catchup($appname_B1); +$node_C->wait_for_catchup($subname_BC); +$node_B->wait_for_catchup($subname_AB); +$node_A->wait_for_catchup($subname_BA); $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); is($result, qq(32), 'The node_C data replicated to node_B'); @@ -136,6 +139,69 @@ is($result, qq(), 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' ); +############################################################################### +# Specifying origin = NONE indicates that the publisher should only replicate the +# changes that are generated locally from node_B, but in this case since the +# node_B is also subscribing data from node_A, node_B can have remotely +# originated data from node_A. We log a warning, in this case, to draw +# attention to there being possible remote data. +############################################################################### +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB2 + CONNECTION '$node_B_connstr application_name=$subname_AB2' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = on)"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, + "Create subscription with origin = none and copy_data when the publisher has subscribed same table" +); + +$node_A->wait_for_subscription_sync($node_B, $subname_AB2); + +# Alter subscription ... refresh publication should be successful when no new +# table is added +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); + +# Check Alter subscription ... refresh publication when there is a new +# table that is subscribing data from a different publication +$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); +$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); + +# add a new table to the publication +$node_A->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new"); +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION"); + +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +# add a new table to the publication +$node_B->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new"); + +# Alter subscription ... refresh publication should log a warning when a new +# table in the publisher is subscribing data from a different publication +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, + "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none" +); + +$node_A->wait_for_subscription_sync($node_B, $subname_AB2); + +# clear the operations done by this test +$node_A->safe_psql('postgres', "DROP TABLE tab_new"); +$node_B->safe_psql('postgres', "DROP TABLE tab_new"); +$node_A->safe_psql('postgres', "DROP SUBSCRIPTION $subname_AB2"); + # shutdown $node_B->stop('fast'); $node_A->stop('fast');