diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 3e46bbdb04..fe13ab9a2d 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -114,7 +114,9 @@ ALTER SUBSCRIPTION name RENAME TO < replaces the entire list of publications with a new list, ADD adds additional publications to the list of publications, and DROP removes the publications from - the list of publications. See + the list of publications. We allow non-existent publications to be + specified in ADD and SET variants + so that users can add those later. See for more information. By default, this command will also act like REFRESH PUBLICATION. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index b701752fc9..ebf7db57c5 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -356,6 +356,13 @@ CREATE SUBSCRIPTION subscription_name + + We allow non-existent publications to be specified so that users can add + those later. This means + pg_subscription + can have non-existent publications. + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abebffdf3b..85dacbe93d 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, } } +/* + * Add publication names from the list to a string. + */ +static void +get_publications_str(List *publications, StringInfo dest, bool quote_literal) +{ + ListCell *lc; + bool first = true; + + Assert(list_length(publications) > 0); + + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(dest, ", "); + + if (quote_literal) + appendStringInfoString(dest, quote_literal_cstr(pubname)); + else + { + appendStringInfoChar(dest, '"'); + appendStringInfoString(dest, pubname); + appendStringInfoChar(dest, '"'); + } + } +} + +/* + * Check the specified publication(s) is(are) present in the publisher. + */ +static void +check_publications(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfo cmd; + TupleTableSlot *slot; + List *publicationsCopy = NIL; + Oid tableRow[1] = {TEXTOID}; + + cmd = makeStringInfo(); + appendStringInfoString(cmd, "SELECT t.pubname FROM\n" + " pg_catalog.pg_publication t WHERE\n" + " t.pubname IN ("); + get_publications_str(publications, cmd, true); + appendStringInfoChar(cmd, ')'); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + pfree(cmd->data); + pfree(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg_plural("could not receive publication from the publisher: %s", + "could not receive list of publications from the publisher: %s", + list_length(publications), + res->err)); + + publicationsCopy = list_copy(publications); + + /* Process publication(s). */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* Delete the publication present in publisher from the list. */ + publicationsCopy = list_delete(publicationsCopy, makeString(pubname)); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + if (list_length(publicationsCopy)) + { + /* Prepare the list of non-existent publication(s) for error message. */ + StringInfo pubnames = makeStringInfo(); + + get_publications_str(publicationsCopy, pubnames, false); + ereport(WARNING, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg_plural("publication %s does not exist in the publisher", + "publications %s do not exist in the publisher", + list_length(publicationsCopy), + pubnames->data)); + } +} + /* * Auxiliary function to build a text array out of a list of String nodes. */ @@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + check_publications(wrconn, publications); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + List *validate_publications) { char *err; List *pubrel_names; @@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { + if (validate_publications) + check_publications(wrconn, validate_publications); + /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); pubrel_names = list_concat(pubrel_names, @@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, + stmt->publication); } break; @@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh if user asked us to. */ if (opts.refresh) { + /* We only need to validate user specified publications. */ + List *validate_publications = (isadd) ? stmt->publication : NULL; + if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh the new list of publications. */ sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, + validate_publications); } break; @@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, NULL); break; } @@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; List *tablelist = NIL; - Assert(list_length(publications) > 0); - initStringInfo(&cmd); appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" " FROM pg_catalog.pg_publication_tables t\n" " WHERE t.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } + get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 1144b005f6..39c32eda44 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -41,6 +41,43 @@ COMMIT; pass "subscription disable and drop in same transaction did not hang"; +# One of the specified publications exists. +my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub" +); +ok( $stderr =~ + m/WARNING: publication "non_existent_pub" does not exist in the publisher/, + "Create subscription throws warning for non-existent publication"); + +$node_publisher->wait_for_catchup('mysub1'); + +# Also wait for initial table sync to finish. +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Also wait for initial table sync to finish. +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Specifying non-existent publication along with add publication. +($ret, $stdout, $stderr) = $node_subscriber->psql( + 'postgres', + "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2" +); +ok( $stderr =~ + m/WARNING: publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/, + "Alter subscription add publication throws warning for non-existent publications"); + +# Specifying non-existent publication along with set publication. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub" +); +ok( $stderr =~ + m/WARNING: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription set publication throws warning for non-existent publication"); + $node_subscriber->stop; $node_publisher->stop;